1 module grpc.stream.server.reader; 2 import grpc.logger; 3 import grpc.core.tag; 4 import interop.headers; 5 import grpc.common.cq; 6 import grpc.common.call; 7 import grpc.core.utils; 8 import grpc.common.batchcall; 9 import automem; 10 import std.experimental.allocator : theAllocator, make, makeArray, dispose; 11 12 struct ServerReader(T) { 13 private { 14 bool _closed; 15 Tag* _tag; 16 CompletionQueue!"Next" _cq; 17 } 18 19 import grpc.common.byte_buffer; 20 import google.protobuf; 21 22 @property bool closed() { 23 return _closed; 24 } 25 26 auto readOne(Duration d = 1.seconds) { 27 assert(_tag != null, "tag shouldn't be null"); 28 29 T protobuf = T.init; 30 31 if (!_tag.ctx.data.valid) { 32 DEBUG!"ctx data is invalid, asking for a new message"; 33 RecvMessageOp op = theAllocator.make!(RecvMessageOp)(&_tag.ctx.data); 34 BatchCall.runSingleOp(op, _cq, _tag, d); 35 theAllocator.dispose(op); 36 } 37 38 ulong len = _tag.ctx.data.length; 39 DEBUG!"bf.length: %d"(len); 40 if(len != 0) { 41 auto data = _tag.ctx.data.readAll(); 42 if (data.length != 0) { 43 ubyte[] dat = data; 44 DEBUG!"%s"(dat); 45 protobuf = dat.fromProtobuf!(T); 46 } 47 48 _tag.ctx.data.cleanup(); 49 } 50 return protobuf; 51 } 52 53 void finish() { 54 DEBUG!"finishing"; 55 RecvCloseOnServerOp op = theAllocator.make!RecvCloseOnServerOp(); 56 BatchCall.runSingleOp(op, _cq, _tag); 57 theAllocator.dispose(op); 58 _closed = true; 59 } 60 61 this(Tag* tag, CompletionQueue!"Next" cq) { 62 _tag = tag; 63 _cq = cq; 64 } 65 66 @disable this(); 67 @disable this(this); 68 }