1 module grpc.server; 2 import interop.headers; 3 import grpc.core.sync.mutex; 4 import grpc.core.resource; 5 import grpc.core.tag; 6 import grpc.core; 7 import grpc.common.call; 8 import grpc.common.cq; 9 import grpc.service; 10 import grpc.logger; 11 import google.rpc.status; 12 import core.thread; 13 import core.lifetime; 14 15 class Server 16 { 17 private { 18 shared(Mutex) mutex; 19 SharedResource _server; 20 shared(CompletionQueue!"Next")[] _registeredCqs; 21 ServiceHandlerInterface[string] services; 22 bool started; 23 shared bool _run; 24 25 void handleShutdown() { 26 grpc_server_cancel_all_calls(handle); 27 foreach(cq; _registeredCqs) { 28 cq.lock(); 29 cq.inShutdownPath(true); 30 cq.unlock(); 31 32 grpc_server_shutdown_and_notify(handle, cq.handle, null); 33 } 34 } 35 } 36 37 inout(grpc_server)* handle() inout @trusted nothrow shared { 38 return cast(typeof(return)) _server.handle; 39 } 40 41 inout(grpc_server)* handle() inout @trusted nothrow { 42 return cast(typeof(return)) _server.handle; 43 } 44 45 void lock() shared { 46 mutex.lock; 47 } 48 49 void lock() { 50 mutex.lock; 51 } 52 53 void unlock() shared { 54 mutex.unlock; 55 } 56 57 void unlock() { 58 mutex.unlock; 59 } 60 61 bool bind(string host, ushort port) { 62 import std.format; 63 import std.string : toStringz; 64 string fmt = format!"%s:%d"(host, port); 65 66 lock; 67 scope(exit) unlock; 68 69 auto status = grpc_server_add_insecure_http2_port(handle, fmt.toStringz); 70 if(status == port) { 71 INFO!"server binded to %s"(fmt); 72 return true; 73 } 74 75 return false; 76 } 77 78 void registerQueue(shared(CompletionQueue!"Next") queue) shared { 79 lock; 80 scope(exit) unlock; 81 _registeredCqs ~= queue; 82 grpc_server_register_completion_queue(handle, queue.handle, null); 83 } 84 85 import core.atomic; 86 void wait() { 87 foreach(service; services.keys) { 88 services[service].kickstart(); 89 } 90 91 while (atomicLoad(_run)) { 92 93 foreach(service; services) { 94 if (service.runners == 0) { 95 ERROR!"service is DEAD!"; 96 } 97 } 98 99 Thread.sleep(1.seconds); 100 } 101 102 handleShutdown(); 103 } 104 105 // this is expected to be called from an ISR (interrupt service routine, Ctrl+C whatever) 106 107 void shutdown() @trusted @nogc nothrow { 108 try { 109 atomicStore(_run, false); 110 } catch(Exception e) { 111 // basically, unless the world burns down this shouldn't *ever* throw 112 } 113 } 114 115 void* registerMethod(const(char)[] remoteName, const(char)[] host, grpc_server_register_method_payload_handling payload_handle, uint flags) 116 { 117 import std.string : toStringz; 118 debug import std.stdio; 119 debug writefln("hello"); 120 DEBUG!"lock"; 121 lock; 122 scope(exit) unlock; 123 DEBUG!"register method %s"(remoteName); 124 void* ptr = grpc_server_register_method(handle, remoteName.toStringz, null, payload_handle, flags); 125 return ptr; 126 } 127 128 129 void registerService(T)() { 130 assert(!started, "Cannot register a new service after Server.start() has been called."); 131 import std.typecons; 132 import std.traits; 133 134 alias parent = BaseTypeTuple!T[1]; 135 alias serviceName = fullyQualifiedName!T; 136 void*[string] registeredMethods; 137 pragma(msg, "gRPC (" ~ fullyQualifiedName!T ~ ")"); 138 static foreach(i, val; getSymbolsByUDA!(parent, RPC)) {{ 139 enum remoteName = getUDAs!(val, RPC)[0].methodName; 140 import std.conv : to; 141 pragma(msg, "RPC (" ~ to!string(i) ~ "): " ~ fullyQualifiedName!(val)); 142 pragma(msg, "\tRemote: " ~ remoteName); 143 144 mixin("import " ~ moduleName!val ~ ";"); 145 146 static if(hasUDA!(val, ClientStreaming) && hasUDA!(val, ServerStreaming)) { 147 pragma(msg, "\tClient <- (stream) -> Server"); 148 149 } 150 else static if(hasUDA!(val, ClientStreaming)) { 151 pragma(msg, "\tClient (stream) -> Server"); 152 } 153 else static if(hasUDA!(val, ServerStreaming)) { 154 pragma(msg, "\tClient <- (stream) Server"); 155 } 156 else { 157 pragma(msg, "\tClient -> Server"); 158 } 159 DEBUG!"register %s"(remoteName); 160 registeredMethods[remoteName] = registerMethod(remoteName, "", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0); 161 } 162 } 163 164 services[serviceName] = new Service!T(services.keys.length, registeredMethods); 165 } 166 167 void run() { 168 foreach(service; services.keys) { 169 services[service].register(this); 170 } 171 172 lock; 173 scope(exit) unlock; 174 175 INFO!"server is starting"; 176 grpc_server_start(handle); 177 started = true; 178 } 179 180 this(grpc_channel_args args) @trusted { 181 grpc_server* srv = grpc_server_create(&args, null); 182 if (srv != null) { 183 static bool release(shared(void)* ptr) @trusted nothrow { 184 grpc_server_destroy(cast(grpc_server*)ptr); 185 return true; 186 } 187 188 _run = true; 189 DEBUG!"creating server resource"; 190 _server = SharedResource(cast(shared)srv, &release); 191 DEBUG!"creating mutex resource"; 192 mutex = cast(shared)Mutex.create(); 193 DEBUG!"done"; 194 } else { 195 assert(0, "creation failed"); 196 } 197 } 198 199 ~this() { 200 debug import std.stdio; 201 debug writefln("running"); 202 } 203 }