1 module grpc.common.batchcall; 2 import std.exception; 3 import interop.headers; 4 import automem; 5 import grpc.common.call; 6 import grpc.core.utils; 7 import grpc.common.metadata; 8 import grpc.common.byte_buffer; 9 import grpc.logger; 10 import std.exception : enforce; 11 import core.lifetime; 12 13 interface RemoteOp { 14 grpc_op_type type(); 15 grpc_op value(); 16 } 17 18 class SendInitialMetadataOp : RemoteOp { 19 private { 20 MetadataArray array; 21 } 22 23 grpc_op_type type() { 24 return GRPC_OP_SEND_INITIAL_METADATA; 25 } 26 27 grpc_op value() { 28 grpc_op ret; 29 30 ret.op = type(); 31 ret.data.send_initial_metadata.metadata = array.handle.metadata; 32 ret.data.send_initial_metadata.count = array.count; 33 34 return ret; 35 } 36 37 this() { 38 array = MetadataArray.create(); 39 } 40 } 41 42 class SendMessageOp : RemoteOp { 43 private { 44 ByteBuffer _buf; 45 } 46 47 grpc_op_type type() { 48 return GRPC_OP_SEND_MESSAGE; 49 } 50 51 grpc_op value() { 52 grpc_op ret; 53 enforce(_buf.valid, "expected byte buffer to be valid"); 54 ret.op = type(); 55 ret.data.send_message.send_message = _buf.handle; 56 return ret; 57 } 58 59 this(ref ubyte[] message) { 60 _buf = ByteBuffer.create(message); 61 } 62 } 63 64 class SendStatusFromServerOp : RemoteOp { 65 private { 66 MetadataArray _trailing_metadata; 67 grpc_status_code _status; 68 grpc_slice _details; 69 } 70 71 grpc_op_type type() { 72 return GRPC_OP_SEND_STATUS_FROM_SERVER; 73 } 74 75 grpc_op value() { 76 grpc_op ret; 77 ret.op = type(); 78 ret.data.send_status_from_server.status_details = &_details; 79 ret.data.send_status_from_server.status = _status; 80 ret.data.send_status_from_server.trailing_metadata_count = _trailing_metadata.count; 81 ret.data.send_status_from_server.trailing_metadata = _trailing_metadata.handle.metadata; 82 83 return ret; 84 } 85 86 void free() { 87 grpc_slice_unref(_details); 88 } 89 90 this(grpc_status_code code, string details) { 91 _details = string_to_slice(details); 92 _status = code; 93 _trailing_metadata = MetadataArray.create(); 94 } 95 96 this() { 97 _details = grpc_empty_slice(); 98 _status = cast(grpc_status_code)0; 99 _trailing_metadata = MetadataArray.create(); 100 } 101 102 ~this() { 103 free; 104 } 105 } 106 107 class RecvInitialMetadataOp : RemoteOp { 108 private { 109 MetadataArray _metadata; 110 } 111 112 grpc_op_type type() { 113 return GRPC_OP_RECV_INITIAL_METADATA; 114 } 115 116 grpc_op value() { 117 grpc_op ret; 118 ret.op = type(); 119 ret.data.recv_initial_metadata.recv_initial_metadata = _metadata.handle; 120 121 return ret; 122 } 123 124 this() { 125 _metadata = MetadataArray.create(); 126 } 127 } 128 129 class RecvMessageOp : RemoteOp { 130 private { 131 ByteBuffer* _buf; 132 } 133 134 grpc_op_type type() { 135 return GRPC_OP_RECV_MESSAGE; 136 } 137 138 grpc_op value() { 139 grpc_op ret; 140 ret.op = type(); 141 ret.data.recv_message.recv_message = _buf.safeHandle; 142 return ret; 143 } 144 145 void free() { 146 } 147 148 this(ByteBuffer* buf) { 149 _buf = buf; 150 } 151 152 ~this() { 153 free; 154 } 155 } 156 157 /* 158 159 class RecvStatusOnClientOp : RemoteOp { 160 private { 161 162 } 163 164 grpc_op_type type() { 165 return GRPC_OP_RECV_STATUS_ON_CLIENT; 166 } 167 168 } 169 */ 170 171 class RecvCloseOnServerOp : RemoteOp { 172 private { 173 shared(int) _cancelled; 174 } 175 176 int cancelled() { 177 return _cancelled; 178 } 179 180 grpc_op_type type() { 181 return GRPC_OP_RECV_CLOSE_ON_SERVER; 182 } 183 184 grpc_op value() { 185 grpc_op ret; 186 ret.op = type(); 187 ret.data.recv_close_on_server.cancelled = cast(int*)&_cancelled; 188 189 return ret; 190 } 191 192 this() { 193 } 194 } 195 196 class BatchCall { 197 private { 198 Vector!(RemoteOp) ops; 199 } 200 201 void addOp(RemoteOp _op) { 202 import std.algorithm.mutation; 203 ops ~= _op; 204 } 205 206 static grpc_call_error kick(CompletionQueue!"Next" cq, Tag* _tag, Duration d = 1.seconds) { 207 assert(*_tag.ctx.call, "call should never be null"); 208 DEBUG!"kicking cq with tag (%x)"(_tag); 209 auto status = grpc_call_start_batch(*_tag.ctx.call, null, 0, _tag, null); 210 if (status == GRPC_CALL_OK) { 211 import core.time; 212 cq.next(d); 213 } 214 return status; 215 } 216 217 static grpc_call_error runSingleOp(RemoteOp _op, CompletionQueue!"Next" cq, Tag* _tag, Duration d = 1.seconds) { 218 assert(*_tag.ctx.call, "call should never be null"); 219 220 if (callOverDeadline(_tag)) { 221 DEBUG!"call exceeded deadline (initial check)"; 222 return GRPC_CALL_ERROR; 223 } 224 225 grpc_op[1] op; 226 op[0] = _op.value(); 227 DEBUG!"starting batch on tag (%x, ops: %d)"(_tag, 1); 228 grpc_call_error status = grpc_call_start_batch(*_tag.ctx.call, op.ptr, 1, _tag, null); 229 grpc_event evt = cq.next(d); 230 while (evt.type != GRPC_OP_COMPLETE) { 231 if (callOverDeadline(_tag)) { 232 DEBUG!"call exceeded deadline, cancel batch"; 233 grpc_call_cancel(*_tag.ctx.call, null); 234 break; 235 } 236 237 if (evt.type == GRPC_QUEUE_SHUTDOWN) { 238 break; 239 } 240 241 if (status == GRPC_CALL_OK) { 242 DEBUG!"waiting for op to complete"; 243 } else if (status == GRPC_CALL_ERROR_TOO_MANY_OPERATIONS) { 244 grpc_call_cancel(*_tag.ctx.call, null); 245 break; 246 } else { 247 ERROR!"STATUS: %s"(status); 248 break; 249 } 250 evt = cq.next(d); 251 DEBUG!"finished batch on tag: %x"(_tag); 252 } 253 254 destroy(op); 255 return status; 256 } 257 258 import grpc.core.tag; 259 import core.time; 260 import grpc.common.cq; 261 262 /* requires the caller to have a lock on the CallContext */ 263 // For tuning, you may assume that changing the duration MAY be optimal, however 264 // if you reduce the duration down to 1 millisecond, if there is ever a collection cycle, 265 // the library can't adequately catch the event (and may result in odd cancellations) 266 267 grpc_call_error run(CompletionQueue!"Next" cq, Tag* _tag, Duration d = 1.seconds) { 268 //assert(sanityCheck(), "failed sanity check"); 269 assert(_tag != null, "tag should never be null"); 270 271 if (callOverDeadline(_tag)) { 272 DEBUG!"call over deadline, refusing to process"; 273 return GRPC_CALL_ERROR; 274 } 275 276 Vector!(grpc_op) _ops; 277 278 foreach(op; ops) { 279 _ops ~= op.value(); 280 } 281 282 DEBUG!"starting batch on tag (%x, ops: %d)"(_tag, _ops.length); 283 grpc_call_error status = grpc_call_start_batch(*_tag.ctx.call, _ops.ptr, _ops.length, _tag, null); 284 grpc_event evt = cq.next(d); 285 while (evt.type != GRPC_OP_COMPLETE) { 286 import core.time; 287 if (callOverDeadline(_tag)) { 288 DEBUG!"call over deadline"; 289 grpc_call_cancel(*_tag.ctx.call, null); 290 break; 291 } 292 293 if (evt.type == GRPC_QUEUE_SHUTDOWN) { 294 break; 295 } 296 297 if (status == GRPC_CALL_OK) { 298 DEBUG!"waiting for op to complete"; 299 } else if (status == GRPC_CALL_ERROR_TOO_MANY_OPERATIONS) { 300 grpc_call_cancel(*_tag.ctx.call, null); 301 break; 302 } else { 303 ERROR!"STATUS: %s"(status); 304 break; 305 } 306 evt = cq.next(d); 307 DEBUG!"finished batch on tag: %x"(_tag); 308 } 309 310 reset; 311 312 return status; 313 } 314 315 void reset() { 316 DEBUG!"Resetting operations (length: %d)"(ops.length); 317 for(int i = 0; i < ops.length; i++) { 318 if (ops[i] is null) continue; 319 320 Object obj = cast(Object)ops[i]; 321 auto type = typeid(obj); 322 // Workaround compiler bug where classinfo fails to retrieve the actual type 323 // of an object (and the subsequent size of it) 324 static foreach(sym; __traits(allMembers, mixin(__MODULE__))) {{ 325 static if(sym[$ - 2 .. $] == "Op" && sym != "RemoteOp") { 326 // pragma(msg, sym); 327 mixin("alias T = " ~ sym ~ ";"); 328 if (obj !is null && type == typeid(T)) { 329 T realObj = cast(T)obj; 330 static if (__traits(compiles, realObj.free)) { 331 realObj.free; 332 } 333 DEBUG!"Freeing %s (%x) (index %d/%d)"(sym, cast(void*)realObj, i, ops.length); 334 destroy(realObj); 335 DEBUG!"OK!"; 336 obj = null; 337 } 338 } 339 }} 340 } 341 342 ops.free; 343 } 344 345 this() { 346 ops = Vector!(RemoteOp)(); 347 } 348 349 ~this() { 350 } 351 }