1 module grpc.stream.server.writer; 2 import interop.headers; 3 import grpc.logger; 4 import grpc.core.tag; 5 import google.rpc.status; 6 import grpc.common.cq; 7 import grpc.common.batchcall; 8 import grpc.common.call; 9 import core.atomic; 10 import automem; 11 import std.experimental.allocator : theAllocator, make, dispose, makeArray; 12 13 struct ServerWriter(T) { 14 private { 15 Tag* _tag; 16 CompletionQueue!"Next" _cq; 17 bool started = false; 18 bool _closed; 19 } 20 21 @property bool closed() { 22 return _closed; 23 } 24 25 bool start() { 26 SendInitialMetadataOp op = theAllocator.make!SendInitialMetadataOp(); 27 BatchCall.runSingleOp(op, _cq, _tag); 28 theAllocator.dispose(op); 29 30 started = true; 31 _closed = false; 32 33 return true; 34 } 35 36 bool write(T obj) { 37 import std.array; 38 import google.protobuf; 39 40 grpc_call_error err; 41 42 if (closed) { 43 return false; 44 } 45 46 if(!started) { 47 return false; 48 } 49 ubyte[] _out = theAllocator.makeArray!ubyte(obj.toProtobuf.array); 50 DEBUG!"running"; 51 SendMessageOp op = theAllocator.make!SendMessageOp(_out); 52 err = BatchCall.runSingleOp(op, _cq, _tag); 53 theAllocator.dispose(op); 54 theAllocator.dispose(_out); 55 56 return err == GRPC_CALL_OK; 57 } 58 59 bool finish(ref Status _stat) { 60 if (closed) { 61 return true; 62 } 63 64 DEBUG!"finish called"; 65 SendStatusFromServerOp op = theAllocator.make!SendStatusFromServerOp(); 66 BatchCall.runSingleOp(op, _cq, _tag); 67 theAllocator.dispose(op); 68 _closed = true; 69 return true; 70 } 71 72 this(Tag* tag, CompletionQueue!"Next" cq) { 73 _tag = tag; 74 _cq = cq; 75 } 76 77 @disable this(); 78 @disable this(this); 79 80 ~this() { 81 } 82 }