1 /++ dub.sdl:
2 	name "http-server-fibers-example"
3 	description "Simple fiber based pseudo HTTP server suitable for benchmarking"
4 	dependency "eventcore" path=".."
5 +/
6 module http_server_fibers_example;
7 
8 import eventcore.core;
9 import eventcore.internal.utils;
10 import std.functional : toDelegate;
11 import std.socket : InternetAddress;
12 import std.exception : enforce;
13 import std.typecons : Rebindable, RefCounted;
14 import core.time : Duration;
15 import core.thread : Fiber;
16 
17 
18 Fiber[] store = new Fiber[20000];
19 size_t storeSize = 0;
20 Fiber getFiber()
21 nothrow {
22 
23 	if (storeSize > 0) return store[--storeSize];
24 	return new Fiber({});
25 }
26 void done(Fiber f)
27 nothrow {
28 	if (storeSize < store.length)
29 		store[storeSize++] = f;
30 }
31 
32 
33 
34 struct AsyncBlocker {
35 	@safe:
36 
37 	bool done;
38 	Rebindable!(const(Exception)) exception;
39 	Fiber owner;
40 
41 	void start()
42 	nothrow {
43 		assert(owner is null);
44 		done = false;
45 		exception = null;
46 		() @trusted { owner = Fiber.getThis(); } ();
47 	}
48 
49 	void wait()
50 	{
51 		() @trusted { while (!done) Fiber.yield(); } ();
52 		auto ex = cast(const(Exception))exception;
53 		owner = null;
54 		done = false;
55 		exception = null;
56 		if (ex) throw ex;
57 	}
58 
59 	void finish(const(Exception) e = null)
60 	nothrow {
61 		assert(!done && owner !is null);
62 		exception = e;
63 		done = true;
64 		() @trusted { scope (failure) assert(false); if (owner.state == Fiber.State.HOLD) owner.call(); } ();
65 	}
66 }
67 
68 alias StreamConnection = RefCounted!StreamConnectionImpl;
69 
70 struct StreamConnectionImpl {
71 	@safe: /*@nogc:*/
72 	private {
73 		StreamSocketFD m_socket;
74 		bool m_empty = false;
75 
76 		AsyncBlocker writer;
77 		AsyncBlocker reader;
78 		ubyte[] m_readBuffer;
79 		size_t m_readBufferFill;
80 
81 		ubyte[] m_line;
82 	}
83 
84 	this(StreamSocketFD sock, ubyte[] buffer)
85 	nothrow {
86 		m_socket = sock;
87 		m_readBuffer = buffer;
88 	}
89 
90 	~this()
91 	nothrow {
92 		if (m_socket != StreamSocketFD.invalid)
93 			eventDriver.sockets.releaseRef(m_socket);
94 	}
95 
96 	@property bool empty()
97 	{
98 		reader.start();
99 		eventDriver.sockets.waitForData(m_socket, &onData);
100 		reader.wait();
101 		return m_empty;
102 	}
103 
104 	ubyte[] readLine()
105 	{
106 		reader.start();
107 		if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0);
108 		else eventDriver.sockets.read(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData);
109 		reader.wait();
110 		auto ln = m_line;
111 		m_line = null;
112 		return ln;
113 	}
114 
115 	void write(const(ubyte)[] data)
116 	{
117 		writer.start();
118 		eventDriver.sockets.write(m_socket, data, IOMode.all, &onWrite);
119 		writer.wait();
120 	}
121 
122 	void close()
123 	nothrow {
124 		eventDriver.sockets.releaseRef(m_socket);
125 		m_socket = StreamSocketFD.invalid;
126 		m_readBuffer = null;
127 	}
128 
129 	private void onWrite(StreamSocketFD fd, IOStatus status, size_t len)
130 	@safe nothrow {
131 		static const ex = new Exception("Failed to write data!");
132 		writer.finish(status == IOStatus.ok ? null : ex);
133 	}
134 
135 	private void onData(StreamSocketFD, IOStatus status, size_t bytes_read)
136 	@safe nothrow {
137 		if (status != IOStatus.ok)
138 			m_empty = true;
139 		reader.finish();
140 	}
141 
142 	private void onReadLineData(StreamSocketFD, IOStatus status, size_t bytes_read)
143 	@safe nothrow {
144 		static const ex = new Exception("Failed to read data!");
145 		static const exh = new Exception("Header line too long.");
146 
147 		import std.algorithm : countUntil;
148 
149 		if (status != IOStatus.ok) {
150 			reader.finish(ex);
151 			return;
152 		}
153 
154 		m_readBufferFill += bytes_read;
155 
156 		assert(m_readBufferFill <= m_readBuffer.length);
157 
158 		auto idx = m_readBuffer[0 .. m_readBufferFill].countUntil(cast(const(ubyte)[])"\r\n");
159 		if (idx >= 0) {
160 			assert(m_readBufferFill + idx <= m_readBuffer.length, "Not enough space to buffer the incoming line.");
161 			m_readBuffer[m_readBufferFill .. m_readBufferFill + idx] = m_readBuffer[0 .. idx];
162 			foreach (i; 0 .. m_readBufferFill - idx - 2)
163 				m_readBuffer[i] = m_readBuffer[idx+2+i];
164 			m_readBufferFill -= idx + 2;
165 
166 			m_line = m_readBuffer[m_readBufferFill + idx + 2 .. m_readBufferFill + idx + 2 + idx];
167 
168 			reader.finish();
169 		} else if (m_readBuffer.length - m_readBufferFill > 0) {
170 			eventDriver.sockets.read(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData);
171 		} else {
172 			reader.finish(exh);
173 		}
174 	}
175 }
176 
177 
178 void main()
179 {
180 	print("Starting up...");
181 	auto addr = new InternetAddress("127.0.0.1", 8080);
182 	auto listener = eventDriver.sockets.listenStream(addr, toDelegate(&onClientConnect));
183 	enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections.");
184 
185 	/*import core.time : msecs;
186 	eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs);
187 	eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/
188 
189 	print("Listening for requests on port 8080...");
190 	while (eventDriver.core.waiterCount)
191 		eventDriver.core.processEvents(Duration.max);
192 }
193 
194 void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client, scope RefAddress)
195 @trusted /*@nogc*/ nothrow {
196 	import core.stdc.stdlib;
197 	auto handler = cast(ClientHandler*)calloc(1, ClientHandler.sizeof);
198 	handler.client = client;
199 	auto f = getFiber();
200 	f.reset(&handler.handleConnection);
201 	scope (failure) assert(false);
202 	f.call();
203 
204 }
205 
206 struct ClientHandler {
207 	@safe: /*@nogc:*/ nothrow:
208 
209 	StreamSocketFD client;
210 
211 	@disable this(this);
212 
213 	void handleConnection()
214 	@trusted {
215 		ubyte[1024] linebuf = void;
216 		auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!";
217 
218 		auto conn = StreamConnection(client, linebuf);
219 		try {
220 			while (true) {
221 				conn.readLine();
222 
223 				ubyte[] ln;
224 				do ln = conn.readLine();
225 				while (ln.length > 0);
226 
227 				conn.write(reply);
228 			}
229 			//print("close %s", cast(int)client);
230 		} catch (Exception e) {
231 			print("close %s: %s", cast(int)client, e.msg);
232 		}
233 		conn.close();
234 
235 		done(Fiber.getThis());
236 	}
237 }