1 module grpc.server;
2 import interop.headers;
3 import grpc.core.sync.mutex;
4 import grpc.core.resource;
5 import grpc.core.tag;
6 import grpc.core;
7 import grpc.common.call;
8 import grpc.common.cq;
9 import grpc.service;
10 import grpc.logger;
11 import google.rpc.status;
12 import core.thread;
13 import core.lifetime;
14 
15 class Server 
16 {
17     private {
18         shared(Mutex) mutex;
19         SharedResource _server;
20         shared(CompletionQueue!"Next")[] _registeredCqs;
21         ServiceHandlerInterface[string] services;
22         bool started;
23         shared bool _run;
24 
25         void handleShutdown() {
26             grpc_server_cancel_all_calls(handle);
27             foreach(cq; _registeredCqs) {
28                 cq.lock();
29                 cq.inShutdownPath(true);
30                 cq.unlock();
31 
32                 grpc_server_shutdown_and_notify(handle, cq.handle, null);
33             }
34         }
35     }
36     
37     inout(grpc_server)* handle() inout @trusted nothrow shared {
38         return cast(typeof(return)) _server.handle;
39     }
40 
41     inout(grpc_server)* handle() inout @trusted nothrow {
42         return cast(typeof(return)) _server.handle;
43     }
44 
45     void lock() shared {
46         mutex.lock;
47     }
48 
49     void lock()  {
50         mutex.lock;
51     }
52     
53     void unlock() shared {
54         mutex.unlock;
55     }
56 
57     void unlock()  {
58         mutex.unlock;
59     }
60 
61     bool bind(string host, ushort port) {
62         import std.format;
63         import std.string : toStringz;
64         string fmt = format!"%s:%d"(host, port);
65         
66         lock;
67         scope(exit) unlock;
68 
69         auto status = grpc_server_add_insecure_http2_port(handle, fmt.toStringz);
70         if(status == port) {
71             INFO!"server binded to %s"(fmt);
72             return true;
73         } 
74 
75         return false;
76     }
77 
78     void registerQueue(shared(CompletionQueue!"Next") queue) shared {
79         lock;
80         scope(exit) unlock;
81         _registeredCqs ~= queue;
82         grpc_server_register_completion_queue(handle, queue.handle, null); 
83     }
84 
85     import core.atomic;
86     void wait() {
87         foreach(service; services.keys) {
88             services[service].kickstart();
89         }
90 
91         while (atomicLoad(_run)) {
92             
93             foreach(service; services) {
94                 if (service.runners == 0) {
95                     ERROR!"service is DEAD!";
96                 }
97             }
98 
99             Thread.sleep(1.seconds);
100         }
101 
102         handleShutdown();
103     }
104 
105     // this is expected to be called from an ISR (interrupt service routine, Ctrl+C whatever)
106     
107     void shutdown() @trusted @nogc nothrow {
108         try { 
109             atomicStore(_run, false);
110         } catch(Exception e) {
111             // basically, unless the world burns down this shouldn't *ever* throw
112         }
113     }
114 
115     void* registerMethod(const(char)[] remoteName, const(char)[] host, grpc_server_register_method_payload_handling payload_handle, uint flags) 
116     {
117         import std.string : toStringz;
118         debug import std.stdio;
119         debug writefln("hello");
120         DEBUG!"lock";
121         lock;
122         scope(exit) unlock;
123         DEBUG!"register method %s"(remoteName);
124         void* ptr = grpc_server_register_method(handle, remoteName.toStringz, null, payload_handle, flags);
125         return ptr;
126     }
127 
128 
129     void registerService(T)() {
130         assert(!started, "Cannot register a new service after Server.start() has been called.");
131         import std.typecons;
132         import std.traits;
133 
134         alias parent = BaseTypeTuple!T[1];
135         alias serviceName = fullyQualifiedName!T;
136         void*[string] registeredMethods;
137         pragma(msg, "gRPC (" ~ fullyQualifiedName!T ~ ")");
138         static foreach(i, val; getSymbolsByUDA!(parent, RPC)) {{
139                 enum remoteName = getUDAs!(val, RPC)[0].methodName;
140                 import std.conv : to;
141                 pragma(msg, "RPC (" ~ to!string(i) ~ "): " ~ fullyQualifiedName!(val));
142                 pragma(msg, "\tRemote: " ~ remoteName);
143 
144                 mixin("import " ~ moduleName!val ~ ";");
145 
146                 static if(hasUDA!(val, ClientStreaming) && hasUDA!(val, ServerStreaming)) {
147                     pragma(msg, "\tClient <- (stream) -> Server");
148 
149                 }
150                 else static if(hasUDA!(val, ClientStreaming)) {
151                     pragma(msg, "\tClient (stream) -> Server");
152                     }
153                     else static if(hasUDA!(val, ServerStreaming)) {
154                         pragma(msg, "\tClient <- (stream) Server");
155                     }
156                     else {
157                         pragma(msg, "\tClient -> Server");
158                     }
159                     DEBUG!"register %s"(remoteName);
160                     registeredMethods[remoteName] = registerMethod(remoteName, "", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
161             }
162         }
163 
164         services[serviceName] = new Service!T(services.keys.length, registeredMethods);
165     }
166 
167     void run() {
168         foreach(service; services.keys) {
169             services[service].register(this);
170         }
171 
172         lock;
173         scope(exit) unlock;
174 
175         INFO!"server is starting";
176         grpc_server_start(handle);
177         started = true;
178     }
179 
180     this(grpc_channel_args args) @trusted {
181         grpc_server* srv = grpc_server_create(&args, null);
182         if (srv != null) {
183             static bool release(shared(void)* ptr) @trusted nothrow {
184                 grpc_server_destroy(cast(grpc_server*)ptr);
185                 return true;
186             }
187 
188             _run = true;
189             DEBUG!"creating server resource";
190             _server = SharedResource(cast(shared)srv, &release);
191             DEBUG!"creating mutex resource";
192             mutex = cast(shared)Mutex.create();
193             DEBUG!"done";
194         } else {
195             assert(0, "creation failed");
196         }
197     }
198 
199     ~this() {
200         debug import std.stdio;
201         debug writefln("running");
202     }
203 }