1 /++ dub.sdl: 2 name "http-server-fibers-example" 3 description "Simple fiber based pseudo HTTP server suitable for benchmarking" 4 dependency "eventcore" path=".." 5 +/ 6 module http_server_fibers_example; 7 8 import eventcore.core; 9 import eventcore.internal.utils; 10 import std.functional : toDelegate; 11 import std.socket : InternetAddress; 12 import std.exception : enforce; 13 import std.typecons : Rebindable, RefCounted; 14 import core.time : Duration; 15 import core.thread : Fiber; 16 17 18 Fiber[] store = new Fiber[20000]; 19 size_t storeSize = 0; 20 Fiber getFiber() 21 nothrow { 22 23 if (storeSize > 0) return store[--storeSize]; 24 return new Fiber({}); 25 } 26 void done(Fiber f) 27 nothrow { 28 if (storeSize < store.length) 29 store[storeSize++] = f; 30 } 31 32 33 34 struct AsyncBlocker { 35 @safe: 36 37 bool done; 38 Rebindable!(const(Exception)) exception; 39 Fiber owner; 40 41 void start() 42 nothrow { 43 assert(owner is null); 44 done = false; 45 exception = null; 46 () @trusted { owner = Fiber.getThis(); } (); 47 } 48 49 void wait() 50 { 51 () @trusted { while (!done) Fiber.yield(); } (); 52 auto ex = cast(const(Exception))exception; 53 owner = null; 54 done = false; 55 exception = null; 56 if (ex) throw ex; 57 } 58 59 void finish(const(Exception) e = null) 60 nothrow { 61 assert(!done && owner !is null); 62 exception = e; 63 done = true; 64 () @trusted { scope (failure) assert(false); if (owner.state == Fiber.State.HOLD) owner.call(); } (); 65 } 66 } 67 68 alias StreamConnection = RefCounted!StreamConnectionImpl; 69 70 struct StreamConnectionImpl { 71 @safe: /*@nogc:*/ 72 private { 73 StreamSocketFD m_socket; 74 bool m_empty = false; 75 76 AsyncBlocker writer; 77 AsyncBlocker reader; 78 ubyte[] m_readBuffer; 79 size_t m_readBufferFill; 80 81 ubyte[] m_line; 82 } 83 84 this(StreamSocketFD sock, ubyte[] buffer) 85 nothrow { 86 m_socket = sock; 87 m_readBuffer = buffer; 88 } 89 90 ~this() 91 nothrow { 92 if (m_socket != StreamSocketFD.invalid) 93 eventDriver.sockets.releaseRef(m_socket); 94 } 95 96 @property bool empty() 97 { 98 reader.start(); 99 eventDriver.sockets.waitForData(m_socket, &onData); 100 reader.wait(); 101 return m_empty; 102 } 103 104 ubyte[] readLine() 105 { 106 reader.start(); 107 if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0); 108 else eventDriver.sockets.read(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); 109 reader.wait(); 110 auto ln = m_line; 111 m_line = null; 112 return ln; 113 } 114 115 void write(const(ubyte)[] data) 116 { 117 writer.start(); 118 eventDriver.sockets.write(m_socket, data, IOMode.all, &onWrite); 119 writer.wait(); 120 } 121 122 void close() 123 nothrow { 124 eventDriver.sockets.releaseRef(m_socket); 125 m_socket = StreamSocketFD.invalid; 126 m_readBuffer = null; 127 } 128 129 private void onWrite(StreamSocketFD fd, IOStatus status, size_t len) 130 @safe nothrow { 131 static const ex = new Exception("Failed to write data!"); 132 writer.finish(status == IOStatus.ok ? null : ex); 133 } 134 135 private void onData(StreamSocketFD, IOStatus status, size_t bytes_read) 136 @safe nothrow { 137 if (status != IOStatus.ok) 138 m_empty = true; 139 reader.finish(); 140 } 141 142 private void onReadLineData(StreamSocketFD, IOStatus status, size_t bytes_read) 143 @safe nothrow { 144 static const ex = new Exception("Failed to read data!"); 145 static const exh = new Exception("Header line too long."); 146 147 import std.algorithm : countUntil; 148 149 if (status != IOStatus.ok) { 150 reader.finish(ex); 151 return; 152 } 153 154 m_readBufferFill += bytes_read; 155 156 assert(m_readBufferFill <= m_readBuffer.length); 157 158 auto idx = m_readBuffer[0 .. m_readBufferFill].countUntil(cast(const(ubyte)[])"\r\n"); 159 if (idx >= 0) { 160 assert(m_readBufferFill + idx <= m_readBuffer.length, "Not enough space to buffer the incoming line."); 161 m_readBuffer[m_readBufferFill .. m_readBufferFill + idx] = m_readBuffer[0 .. idx]; 162 foreach (i; 0 .. m_readBufferFill - idx - 2) 163 m_readBuffer[i] = m_readBuffer[idx+2+i]; 164 m_readBufferFill -= idx + 2; 165 166 m_line = m_readBuffer[m_readBufferFill + idx + 2 .. m_readBufferFill + idx + 2 + idx]; 167 168 reader.finish(); 169 } else if (m_readBuffer.length - m_readBufferFill > 0) { 170 eventDriver.sockets.read(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); 171 } else { 172 reader.finish(exh); 173 } 174 } 175 } 176 177 178 void main() 179 { 180 print("Starting up..."); 181 auto addr = new InternetAddress("127.0.0.1", 8080); 182 auto listener = eventDriver.sockets.listenStream(addr, toDelegate(&onClientConnect)); 183 enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); 184 185 /*import core.time : msecs; 186 eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); 187 eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/ 188 189 print("Listening for requests on port 8080..."); 190 while (eventDriver.core.waiterCount) 191 eventDriver.core.processEvents(Duration.max); 192 } 193 194 void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client, scope RefAddress) 195 @trusted /*@nogc*/ nothrow { 196 import core.stdc.stdlib; 197 auto handler = cast(ClientHandler*)calloc(1, ClientHandler.sizeof); 198 handler.client = client; 199 auto f = getFiber(); 200 f.reset(&handler.handleConnection); 201 scope (failure) assert(false); 202 f.call(); 203 204 } 205 206 struct ClientHandler { 207 @safe: /*@nogc:*/ nothrow: 208 209 StreamSocketFD client; 210 211 @disable this(this); 212 213 void handleConnection() 214 @trusted { 215 ubyte[1024] linebuf = void; 216 auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; 217 218 auto conn = StreamConnection(client, linebuf); 219 try { 220 while (true) { 221 conn.readLine(); 222 223 ubyte[] ln; 224 do ln = conn.readLine(); 225 while (ln.length > 0); 226 227 conn.write(reply); 228 } 229 //print("close %s", cast(int)client); 230 } catch (Exception e) { 231 print("close %s: %s", cast(int)client, e.msg); 232 } 233 conn.close(); 234 235 done(Fiber.getThis()); 236 } 237 }