1 module grpc.service.queue;
2 import core.sync.semaphore;
3 import interop.headers;
4 debug import std.stdio;
5 import core.atomic;
6 import grpc.core.utils;
7 
8 // class adopted from some answer on stackoverflow/d forums
9 
10 class Queue(T) {
11     private struct Node {
12         T     payload;
13         Node* next;
14     }
15  
16     private Node* _first;
17     private Node* _last = new Node(T.init,null);
18 
19     @property int count() {
20         gpr_mu_lock(&mutex);
21         scope(exit) gpr_mu_unlock(&mutex);
22         return atomicLoad(_count);
23     }
24 
25     private shared int   _count = 0;
26     private gpr_cv cv;
27     private gpr_mu mutex;
28  
29     void lock() {
30         gpr_mu_lock(&mutex);
31     }
32     
33     void unlock() {
34         gpr_mu_unlock(&mutex);
35     }
36  
37     this() {
38         gpr_cv_init(&cv);
39         gpr_mu_init(&mutex);
40         
41         this._first = this._last;
42     }
43 
44     ~this() {
45         gpr_cv_destroy(&cv);
46         gpr_mu_destroy(&mutex);
47     }
48  
49     /**
50         Add to the Queue (to the end).
51     */
52     
53     void signal() {
54         gpr_cv_signal(&cv);
55     }
56     
57     void notifyAll() {
58         gpr_cv_broadcast(&cv);
59     }
60 
61     void put(T value) {
62         gpr_mu_lock(&mutex);
63         { 
64             Node* newLast = new Node(null,null);
65             this._last.payload = value;
66             this._last.next = newLast;
67             this._last = newLast;
68             atomicOp!"+="(_count, 1);
69         }
70         gpr_mu_unlock(&mutex);
71         gpr_cv_signal(&cv);
72     }
73    
74  
75     /**
76         To be iterable with `foreach` loop.
77     */
78 
79     void notify(gpr_timespec timeout = durtotimespec(10.seconds)) {
80         gpr_mu_lock(&mutex);
81         if (_count == 0) {
82             gpr_cv_wait(&cv, &mutex, timeout);
83         } else if (_count <= 0) {
84             assert(0, "count should never be this");
85             //debug writeln("count SHOULD NOT BE BELOW 0, ", _count);
86         } else {
87             //debug writeln("skipping wait, count: ", _count);
88         }
89     }
90 
91     
92     /* ASSUMES YOU ARE LOCKED */
93     bool empty() {
94         return atomicLoad(_count) == 0;
95     }
96  
97     ///ditto
98     T popFront() in {
99         assert (!this.empty);
100     } do {
101         gpr_mu_lock(&mutex);
102         scope(exit) gpr_mu_unlock(&mutex);
103         T obj;
104         if (this._first != null) {
105             obj = cast(T)(this._first.payload);
106 
107             this._first = this._first.next;
108         }
109         atomicOp!"-="(_count, 1);
110         return obj;
111     }
112     
113     void pop() in {
114         assert (!this.empty);
115     } do {
116         if (this._first != null) {
117             this._first = this._first.next;
118         }
119         atomicOp!"-="(_count, 1);
120     }
121  
122     ///ditto
123     T front() in { 
124         assert (!this.empty);
125     } do {
126         return cast(T)(this._first.payload);
127     }
128  
129 }