1 module grpc.stream.server.reader;
2 import grpc.logger;
3 import grpc.core.tag;
4 import interop.headers;
5 import grpc.common.cq; 
6 import grpc.common.call;
7 import grpc.core.utils;
8 import grpc.common.batchcall;
9 import automem;
10 import std.experimental.allocator : theAllocator, make, makeArray, dispose;
11 
12 struct ServerReader(T) {
13     private {
14         bool _closed;
15         Tag* _tag;
16         CompletionQueue!"Next" _cq;
17     }
18 
19     import grpc.common.byte_buffer;
20     import google.protobuf;
21 
22     @property bool closed() {
23         return _closed;
24     }
25 
26     auto readOne(Duration d = 1.seconds) {
27         assert(_tag != null, "tag shouldn't be null");
28 
29         T protobuf = T.init;
30 
31         if (!_tag.ctx.data.valid) {
32             DEBUG!"ctx data is invalid, asking for a new message";
33             RecvMessageOp op = theAllocator.make!(RecvMessageOp)(&_tag.ctx.data);
34             BatchCall.runSingleOp(op, _cq, _tag, d);
35             theAllocator.dispose(op);
36         }
37 
38         ulong len = _tag.ctx.data.length;
39         DEBUG!"bf.length: %d"(len);
40         if(len != 0) {
41             auto data = _tag.ctx.data.readAll();
42             if (data.length != 0) { 
43                 ubyte[] dat = data;
44                 DEBUG!"%s"(dat);
45                 protobuf = dat.fromProtobuf!(T);
46             }
47 
48             _tag.ctx.data.cleanup();
49         }
50         return protobuf;
51     }
52 
53     void finish() {
54         DEBUG!"finishing";
55         RecvCloseOnServerOp op = theAllocator.make!RecvCloseOnServerOp();
56         BatchCall.runSingleOp(op, _cq, _tag);
57         theAllocator.dispose(op);
58         _closed = true;
59     }
60 
61     this(Tag* tag, CompletionQueue!"Next" cq) {
62         _tag = tag;
63         _cq = cq;
64     }
65 
66     @disable this();
67     @disable this(this);
68 }