1 module grpc.common.cq;
2 import grpc.logger;
3 import interop.headers;
4 import std.typecons;
5 import grpc.core.tag;
6 import grpc.core;
7 import grpc.core.sync.mutex;
8 import grpc.core.resource;
9 import grpc.core.utils;
10 import core.lifetime;
11 import std.experimental.allocator : theAllocator, make, dispose;
12 
13 //queue ok/ok type
14 alias NextStatus = Tuple!(bool, bool);
15 
16 
17 // TODO: add mutexes 
18 
19 import core.thread;
20 import std.parallelism;
21 
22 import std.traits;
23 
24 class CompletionQueue(string T) 
25     if(T == "Next") 
26 {
27 @safe:
28     private { 
29         shared(Mutex) mutex;
30         SharedResource _cq;
31         bool _inShutdownPath;
32     }
33 
34     bool inShutdownPath() shared {
35         return _inShutdownPath;
36     }
37 
38     bool inShutdownPath(bool val) shared {
39         _inShutdownPath = val;
40         return val;
41     }
42 
43     inout(grpc_completion_queue)* handle() inout @trusted nothrow shared {
44         return cast(typeof(return)) _cq.handle;
45     }
46 
47     inout(grpc_completion_queue)* handle() inout @trusted nothrow {
48         return cast(typeof(return)) _cq.handle;
49     }
50 
51     /* Preserved for compatibility */
52     auto ptr(string file = __FILE__) @trusted {
53         return handle;
54     }
55 
56     void lock() {
57         mutex.lock;
58     }
59 
60     void lock() shared {
61         mutex.lock;
62     }
63 
64     void unlock() shared {
65         mutex.unlock;
66     }
67 
68     void unlock() {
69         mutex.unlock;
70     }
71     
72     static if(T == "Pluck") {
73         // TODO: add Pluck/Callback types
74     }
75     static if(T == "Next") {
76         grpc_event next(Duration time) @trusted shared {
77             gpr_timespec t = durtotimespec(time);
78             grpc_event _evt = grpc_completion_queue_next(handle, t, null);
79 
80             return _evt;
81         }
82 
83         grpc_event next(Duration time) @trusted {
84             gpr_timespec t = durtotimespec(time);
85             grpc_event _evt = grpc_completion_queue_next(handle, t, null);
86 
87             return _evt;
88         }
89     }
90 
91     import grpc.server;
92 
93     grpc_call_error requestCall(void* method, Tag* tag, shared(Server) _server, shared(CompletionQueue!"Next") boundToCall) @trusted {
94         assert(tag != null, "tag null");
95         DEBUG!"hmm"();
96 
97         DEBUG!"locking context mutex";
98         
99         _server.lock;
100         tag.ctx.mutex.lock;
101         mutex.lock;
102 
103         scope(exit) {
104             _server.unlock;
105             tag.ctx.mutex.unlock;
106             mutex.unlock;
107         }
108 
109         DEBUG!"1";
110         auto server_ptr = _server.handle();
111         DEBUG!"2";
112         auto method_cq = handle();
113 
114         DEBUG!"3";
115         auto server_cq = boundToCall.handle();
116         
117         auto ctx = &tag.ctx;
118         assert(ctx != null, "context null");
119         DEBUG!"4";
120         auto details = ctx.details.handle();
121         
122         DEBUG!"5";
123         auto metadata = ctx.metadata.handle();
124         DEBUG!"6";
125         auto data = ctx.data.safeHandle();
126 
127         DEBUG!"call: %x"(ctx.call);
128 
129         grpc_call_error error = grpc_server_request_registered_call(server_ptr,
130                 method, ctx.call, &details.deadline, metadata,
131                 data, method_cq, server_cq, tag);
132 
133         DEBUG!"successfully reregistered"();
134 
135         return error;
136 
137     }
138 
139 
140     this() @trusted {
141         grpc_completion_queue* cq = null;
142 
143         static if (T == "Next") {
144             cq = grpc_completion_queue_create_for_next(null);
145         } else {
146         }
147 
148         assert(cq != null, "CQ creation error");
149 
150         static bool release(shared(void)* ptr) @trusted nothrow {
151             grpc_completion_queue_shutdown(cast(grpc_completion_queue*)ptr);
152             grpc_completion_queue_destroy(cast(grpc_completion_queue*)ptr);
153 
154             return true;
155         }
156 
157         _cq = SharedResource(cast(shared)cq, &release);
158         mutex = cast(shared)Mutex.create();
159     }
160 
161 
162     static CompletionQueue!T opCall() @trusted {
163         CompletionQueue!T obj = theAllocator.make!(CompletionQueue!T)();
164         return obj;
165     }
166 
167     void shutdown() @trusted {
168         grpc_completion_queue_shutdown(handle);
169         INFO!"shutting down CQ";
170     }
171 
172 }
173