1 module grpc.common.cq; 2 import grpc.logger; 3 import interop.headers; 4 import std.typecons; 5 import grpc.core.tag; 6 import grpc.core; 7 import grpc.core.sync.mutex; 8 import grpc.core.resource; 9 import grpc.core.utils; 10 import core.lifetime; 11 import std.experimental.allocator : theAllocator, make, dispose; 12 13 //queue ok/ok type 14 alias NextStatus = Tuple!(bool, bool); 15 16 17 // TODO: add mutexes 18 19 import core.thread; 20 import std.parallelism; 21 22 import std.traits; 23 24 class CompletionQueue(string T) 25 if(T == "Next") 26 { 27 @safe: 28 private { 29 shared(Mutex) mutex; 30 SharedResource _cq; 31 bool _inShutdownPath; 32 } 33 34 bool inShutdownPath() shared { 35 return _inShutdownPath; 36 } 37 38 bool inShutdownPath(bool val) shared { 39 _inShutdownPath = val; 40 return val; 41 } 42 43 inout(grpc_completion_queue)* handle() inout @trusted nothrow shared { 44 return cast(typeof(return)) _cq.handle; 45 } 46 47 inout(grpc_completion_queue)* handle() inout @trusted nothrow { 48 return cast(typeof(return)) _cq.handle; 49 } 50 51 /* Preserved for compatibility */ 52 auto ptr(string file = __FILE__) @trusted { 53 return handle; 54 } 55 56 void lock() { 57 mutex.lock; 58 } 59 60 void lock() shared { 61 mutex.lock; 62 } 63 64 void unlock() shared { 65 mutex.unlock; 66 } 67 68 void unlock() { 69 mutex.unlock; 70 } 71 72 static if(T == "Pluck") { 73 // TODO: add Pluck/Callback types 74 } 75 static if(T == "Next") { 76 grpc_event next(Duration time) @trusted shared { 77 gpr_timespec t = durtotimespec(time); 78 grpc_event _evt = grpc_completion_queue_next(handle, t, null); 79 80 return _evt; 81 } 82 83 grpc_event next(Duration time) @trusted { 84 gpr_timespec t = durtotimespec(time); 85 grpc_event _evt = grpc_completion_queue_next(handle, t, null); 86 87 return _evt; 88 } 89 } 90 91 import grpc.server; 92 93 grpc_call_error requestCall(void* method, Tag* tag, shared(Server) _server, shared(CompletionQueue!"Next") boundToCall) @trusted { 94 assert(tag != null, "tag null"); 95 DEBUG!"hmm"(); 96 97 DEBUG!"locking context mutex"; 98 99 _server.lock; 100 tag.ctx.mutex.lock; 101 mutex.lock; 102 103 scope(exit) { 104 _server.unlock; 105 tag.ctx.mutex.unlock; 106 mutex.unlock; 107 } 108 109 DEBUG!"1"; 110 auto server_ptr = _server.handle(); 111 DEBUG!"2"; 112 auto method_cq = handle(); 113 114 DEBUG!"3"; 115 auto server_cq = boundToCall.handle(); 116 117 auto ctx = &tag.ctx; 118 assert(ctx != null, "context null"); 119 DEBUG!"4"; 120 auto details = ctx.details.handle(); 121 122 DEBUG!"5"; 123 auto metadata = ctx.metadata.handle(); 124 DEBUG!"6"; 125 auto data = ctx.data.safeHandle(); 126 127 DEBUG!"call: %x"(ctx.call); 128 129 grpc_call_error error = grpc_server_request_registered_call(server_ptr, 130 method, ctx.call, &details.deadline, metadata, 131 data, method_cq, server_cq, tag); 132 133 DEBUG!"successfully reregistered"(); 134 135 return error; 136 137 } 138 139 140 this() @trusted { 141 grpc_completion_queue* cq = null; 142 143 static if (T == "Next") { 144 cq = grpc_completion_queue_create_for_next(null); 145 } else { 146 } 147 148 assert(cq != null, "CQ creation error"); 149 150 static bool release(shared(void)* ptr) @trusted nothrow { 151 grpc_completion_queue_shutdown(cast(grpc_completion_queue*)ptr); 152 grpc_completion_queue_destroy(cast(grpc_completion_queue*)ptr); 153 154 return true; 155 } 156 157 _cq = SharedResource(cast(shared)cq, &release); 158 mutex = cast(shared)Mutex.create(); 159 } 160 161 162 static CompletionQueue!T opCall() @trusted { 163 CompletionQueue!T obj = theAllocator.make!(CompletionQueue!T)(); 164 return obj; 165 } 166 167 void shutdown() @trusted { 168 grpc_completion_queue_shutdown(handle); 169 INFO!"shutting down CQ"; 170 } 171 172 } 173