1 module grpc.stream.server.writer;
2 import interop.headers;
3 import grpc.logger;
4 import grpc.core.tag;
5 import google.rpc.status;
6 import grpc.common.cq;
7 import grpc.common.batchcall;
8 import grpc.common.call;
9 import core.atomic;
10 import automem;
11 import std.experimental.allocator : theAllocator, make, dispose, makeArray;
12 
13 struct ServerWriter(T) {
14     private {
15         Tag* _tag;
16         CompletionQueue!"Next" _cq;
17         bool started = false;
18         bool _closed;
19     }
20 
21     @property bool closed() {
22         return _closed;
23     }
24 
25     bool start() {
26         SendInitialMetadataOp op = theAllocator.make!SendInitialMetadataOp();
27         BatchCall.runSingleOp(op, _cq, _tag);
28         theAllocator.dispose(op);
29         
30         started = true;
31         _closed = false;
32 
33         return true;
34     }
35 
36     bool write(T obj) {
37         import std.array;
38         import google.protobuf;
39 
40         grpc_call_error err;
41 
42         if (closed) {
43             return false;
44         }
45         
46         if(!started) {
47             return false;
48         }
49         ubyte[] _out = theAllocator.makeArray!ubyte(obj.toProtobuf.array);
50         DEBUG!"running";
51         SendMessageOp op = theAllocator.make!SendMessageOp(_out);
52         err = BatchCall.runSingleOp(op, _cq, _tag);
53         theAllocator.dispose(op);
54         theAllocator.dispose(_out);
55 
56         return err == GRPC_CALL_OK;
57     }
58 
59     bool finish(ref Status _stat) {
60         if (closed) {
61             return true;
62         }
63 
64         DEBUG!"finish called";
65         SendStatusFromServerOp op = theAllocator.make!SendStatusFromServerOp();
66         BatchCall.runSingleOp(op, _cq, _tag);
67         theAllocator.dispose(op);
68         _closed = true;
69         return true;
70     }
71 
72     this(Tag* tag, CompletionQueue!"Next" cq) {
73         _tag = tag;
74         _cq = cq;
75     }
76 
77     @disable this();
78     @disable this(this);
79 
80     ~this() {
81     }
82 }