1 module grpc.common.batchcall;
2 import std.exception;
3 import interop.headers;
4 import automem;
5 import grpc.common.call;
6 import grpc.core.utils;
7 import grpc.common.metadata;
8 import grpc.common.byte_buffer;
9 import grpc.logger;
10 import std.exception : enforce;
11 import core.lifetime;
12 
13 interface RemoteOp {
14     grpc_op_type type();
15     grpc_op value();
16 }
17 
18 class SendInitialMetadataOp : RemoteOp {
19     private {
20         MetadataArray array;
21     }
22 
23     grpc_op_type type() {
24         return GRPC_OP_SEND_INITIAL_METADATA;
25     }
26 
27     grpc_op value() {
28         grpc_op ret;
29 
30         ret.op = type();
31         ret.data.send_initial_metadata.metadata = array.handle.metadata;
32         ret.data.send_initial_metadata.count = array.count;
33 
34         return ret;
35     }
36     
37     this() {
38         array = MetadataArray.create();
39     }
40 }
41 
42 class SendMessageOp : RemoteOp {
43     private {
44         ByteBuffer _buf;
45     }
46 
47     grpc_op_type type() {
48         return GRPC_OP_SEND_MESSAGE;
49     }
50 
51     grpc_op value() {
52         grpc_op ret;
53         enforce(_buf.valid, "expected byte buffer to be valid");
54         ret.op = type();
55         ret.data.send_message.send_message = _buf.handle;
56         return ret;
57     }
58 
59     this(ref ubyte[] message) {
60         _buf = ByteBuffer.create(message);
61     }
62 }
63 
64 class SendStatusFromServerOp : RemoteOp {
65     private {
66         MetadataArray _trailing_metadata;
67         grpc_status_code _status;
68         grpc_slice _details;
69     }
70 
71     grpc_op_type type() {
72         return GRPC_OP_SEND_STATUS_FROM_SERVER;
73     }
74 
75     grpc_op value() {
76         grpc_op ret;
77         ret.op = type();
78         ret.data.send_status_from_server.status_details = &_details;
79         ret.data.send_status_from_server.status = _status;
80         ret.data.send_status_from_server.trailing_metadata_count = _trailing_metadata.count;
81         ret.data.send_status_from_server.trailing_metadata = _trailing_metadata.handle.metadata;
82         
83         return ret;
84     }
85     
86     void free() {
87         grpc_slice_unref(_details);
88     }
89         
90     this(grpc_status_code code, string details) {
91         _details = string_to_slice(details);
92         _status = code;
93         _trailing_metadata = MetadataArray.create();
94     }
95     
96     this() {
97         _details = grpc_empty_slice();
98         _status = cast(grpc_status_code)0;
99         _trailing_metadata = MetadataArray.create();
100     }
101 
102     ~this() {
103         free;
104     }
105 }
106 
107 class RecvInitialMetadataOp : RemoteOp {
108     private {
109         MetadataArray _metadata;
110     }
111 
112     grpc_op_type type() {
113         return GRPC_OP_RECV_INITIAL_METADATA;
114     }
115 
116     grpc_op value() {
117         grpc_op ret;
118         ret.op = type();
119         ret.data.recv_initial_metadata.recv_initial_metadata = _metadata.handle;
120 
121         return ret;
122     }
123 
124     this() {
125         _metadata = MetadataArray.create();
126     }
127 }
128 
129 class RecvMessageOp : RemoteOp {
130     private {
131         ByteBuffer* _buf;
132     }
133 
134     grpc_op_type type() {
135         return GRPC_OP_RECV_MESSAGE;
136     }
137 
138     grpc_op value() {
139         grpc_op ret;
140         ret.op = type();
141         ret.data.recv_message.recv_message = _buf.safeHandle;
142         return ret;
143     }
144     
145     void free() {
146     }
147     
148     this(ByteBuffer* buf) {
149         _buf = buf;
150     }
151     
152     ~this() {
153         free;
154     }
155 }
156 
157 /*
158 
159 class RecvStatusOnClientOp : RemoteOp {
160     private {
161 
162     }
163 
164     grpc_op_type type() {
165         return GRPC_OP_RECV_STATUS_ON_CLIENT;
166     }
167 
168 }
169 */
170 
171 class RecvCloseOnServerOp : RemoteOp {
172     private {
173         shared(int) _cancelled;
174     }
175 
176     int cancelled() {
177         return _cancelled;
178     }
179 
180     grpc_op_type type() {
181         return GRPC_OP_RECV_CLOSE_ON_SERVER;
182     }
183 
184     grpc_op value() {
185         grpc_op ret;
186         ret.op = type();
187         ret.data.recv_close_on_server.cancelled = cast(int*)&_cancelled;
188 
189         return ret;
190     }
191 
192     this() {
193     }
194 }
195 
196 class BatchCall {
197     private {
198         Vector!(RemoteOp) ops;
199     }
200 
201     void addOp(RemoteOp _op) {
202         import std.algorithm.mutation;
203         ops ~= _op;
204     }
205 
206     static grpc_call_error kick(CompletionQueue!"Next" cq, Tag* _tag, Duration d = 1.seconds) {
207         assert(*_tag.ctx.call, "call should never be null");
208         DEBUG!"kicking cq with tag (%x)"(_tag);
209         auto status = grpc_call_start_batch(*_tag.ctx.call, null, 0, _tag, null);
210         if (status == GRPC_CALL_OK) {
211             import core.time;
212             cq.next(d);
213         }
214         return status;
215     }
216     
217     static grpc_call_error runSingleOp(RemoteOp _op, CompletionQueue!"Next" cq, Tag* _tag, Duration d = 1.seconds) {
218         assert(*_tag.ctx.call, "call should never be null");
219     
220         if (callOverDeadline(_tag)) {
221             DEBUG!"call exceeded deadline (initial check)";
222             return GRPC_CALL_ERROR;
223         }
224 
225         grpc_op[1] op;
226         op[0] = _op.value();
227         DEBUG!"starting batch on tag (%x, ops: %d)"(_tag, 1);
228         grpc_call_error status = grpc_call_start_batch(*_tag.ctx.call, op.ptr, 1, _tag, null);  
229         grpc_event evt = cq.next(d); 
230         while (evt.type != GRPC_OP_COMPLETE) {
231             if (callOverDeadline(_tag)) {
232                 DEBUG!"call exceeded deadline, cancel batch";
233                 grpc_call_cancel(*_tag.ctx.call, null);
234                 break;
235             }
236 
237             if (evt.type == GRPC_QUEUE_SHUTDOWN) {
238                 break;
239             }
240 
241             if (status == GRPC_CALL_OK) {
242                 DEBUG!"waiting for op to complete";
243             } else if (status == GRPC_CALL_ERROR_TOO_MANY_OPERATIONS) {
244                 grpc_call_cancel(*_tag.ctx.call, null);
245                 break;
246             } else {
247                 ERROR!"STATUS: %s"(status);
248                 break;
249             }
250             evt = cq.next(d);
251             DEBUG!"finished batch on tag: %x"(_tag);
252        }
253 
254         destroy(op);
255         return status;
256     }
257         
258     import grpc.core.tag;
259     import core.time;
260     import grpc.common.cq;
261 
262     /* requires the caller to have a lock on the CallContext */
263     // For tuning, you may assume that changing the duration MAY be optimal, however
264     // if you reduce the duration down to 1 millisecond, if there is ever a collection cycle,
265     // the library can't adequately catch the event (and may result in odd cancellations)
266 
267     grpc_call_error run(CompletionQueue!"Next" cq, Tag* _tag, Duration d = 1.seconds) { 
268         //assert(sanityCheck(), "failed sanity check");
269         assert(_tag != null, "tag should never be null");
270 
271         if (callOverDeadline(_tag)) {
272             DEBUG!"call over deadline, refusing to process";
273             return GRPC_CALL_ERROR;
274         }
275 
276         Vector!(grpc_op) _ops;
277 
278         foreach(op; ops) {
279             _ops ~= op.value();
280         }
281 
282         DEBUG!"starting batch on tag (%x, ops: %d)"(_tag, _ops.length);
283         grpc_call_error status = grpc_call_start_batch(*_tag.ctx.call, _ops.ptr, _ops.length, _tag, null);  
284         grpc_event evt = cq.next(d); 
285         while (evt.type != GRPC_OP_COMPLETE) {
286             import core.time;
287             if (callOverDeadline(_tag)) {
288                 DEBUG!"call over deadline";
289                 grpc_call_cancel(*_tag.ctx.call, null);
290                 break;
291             }
292 
293             if (evt.type == GRPC_QUEUE_SHUTDOWN) {
294                 break;
295             }
296 
297             if (status == GRPC_CALL_OK) {
298                 DEBUG!"waiting for op to complete";
299             } else if (status == GRPC_CALL_ERROR_TOO_MANY_OPERATIONS) {
300                 grpc_call_cancel(*_tag.ctx.call, null);
301                 break;
302             } else {
303                 ERROR!"STATUS: %s"(status);
304                 break;
305             }
306             evt = cq.next(d);
307             DEBUG!"finished batch on tag: %x"(_tag);
308        }
309         
310         reset;
311 
312         return status;
313     }
314     
315     void reset() {
316         DEBUG!"Resetting operations (length: %d)"(ops.length);
317         for(int i = 0; i < ops.length; i++) {
318             if (ops[i] is null) continue;
319             
320             Object obj = cast(Object)ops[i];
321             auto type = typeid(obj);
322             // Workaround compiler bug where classinfo fails to retrieve the actual type 
323             // of an object (and the subsequent size of it)
324             static foreach(sym; __traits(allMembers, mixin(__MODULE__))) {{
325                 static if(sym[$ - 2 .. $] == "Op" && sym != "RemoteOp") {
326                     // pragma(msg, sym);
327                     mixin("alias T = " ~ sym ~ ";");
328                     if (obj !is null && type == typeid(T)) {
329                         T realObj = cast(T)obj;
330                         static if (__traits(compiles, realObj.free)) {
331                             realObj.free;
332                         }
333                         DEBUG!"Freeing %s (%x) (index %d/%d)"(sym, cast(void*)realObj, i, ops.length);
334                         destroy(realObj);
335                         DEBUG!"OK!";
336                         obj = null;
337                     }
338                 }
339             }}
340         }
341         
342         ops.free;
343     }
344 
345     this() {
346         ops = Vector!(RemoteOp)();
347     }
348 
349     ~this() {
350     }
351 }