1 module grpc.service.queue; 2 import core.sync.semaphore; 3 import interop.headers; 4 debug import std.stdio; 5 import core.atomic; 6 import grpc.core.utils; 7 8 // class adopted from some answer on stackoverflow/d forums 9 10 class Queue(T) { 11 private struct Node { 12 T payload; 13 Node* next; 14 } 15 16 private Node* _first; 17 private Node* _last = new Node(T.init,null); 18 19 @property int count() { 20 gpr_mu_lock(&mutex); 21 scope(exit) gpr_mu_unlock(&mutex); 22 return atomicLoad(_count); 23 } 24 25 private shared int _count = 0; 26 private gpr_cv cv; 27 private gpr_mu mutex; 28 29 void lock() { 30 gpr_mu_lock(&mutex); 31 } 32 33 void unlock() { 34 gpr_mu_unlock(&mutex); 35 } 36 37 this() { 38 gpr_cv_init(&cv); 39 gpr_mu_init(&mutex); 40 41 this._first = this._last; 42 } 43 44 ~this() { 45 gpr_cv_destroy(&cv); 46 gpr_mu_destroy(&mutex); 47 } 48 49 /** 50 Add to the Queue (to the end). 51 */ 52 53 void signal() { 54 gpr_cv_signal(&cv); 55 } 56 57 void notifyAll() { 58 gpr_cv_broadcast(&cv); 59 } 60 61 void put(T value) { 62 gpr_mu_lock(&mutex); 63 { 64 Node* newLast = new Node(null,null); 65 this._last.payload = value; 66 this._last.next = newLast; 67 this._last = newLast; 68 atomicOp!"+="(_count, 1); 69 } 70 gpr_mu_unlock(&mutex); 71 gpr_cv_signal(&cv); 72 } 73 74 75 /** 76 To be iterable with `foreach` loop. 77 */ 78 79 void notify(gpr_timespec timeout = durtotimespec(10.seconds)) { 80 gpr_mu_lock(&mutex); 81 if (_count == 0) { 82 gpr_cv_wait(&cv, &mutex, timeout); 83 } else if (_count <= 0) { 84 assert(0, "count should never be this"); 85 //debug writeln("count SHOULD NOT BE BELOW 0, ", _count); 86 } else { 87 //debug writeln("skipping wait, count: ", _count); 88 } 89 } 90 91 92 /* ASSUMES YOU ARE LOCKED */ 93 bool empty() { 94 return atomicLoad(_count) == 0; 95 } 96 97 ///ditto 98 T popFront() in { 99 assert (!this.empty); 100 } do { 101 gpr_mu_lock(&mutex); 102 scope(exit) gpr_mu_unlock(&mutex); 103 T obj; 104 if (this._first != null) { 105 obj = cast(T)(this._first.payload); 106 107 this._first = this._first.next; 108 } 109 atomicOp!"-="(_count, 1); 110 return obj; 111 } 112 113 void pop() in { 114 assert (!this.empty); 115 } do { 116 if (this._first != null) { 117 this._first = this._first.next; 118 } 119 atomicOp!"-="(_count, 1); 120 } 121 122 ///ditto 123 T front() in { 124 assert (!this.empty); 125 } do { 126 return cast(T)(this._first.payload); 127 } 128 129 }