1 module echo_server.app;
2 
3 import during;
4 import mempooled.fixed;
5 
6 import core.stdc.stdio;
7 import core.stdc.stdlib;
8 import core.sys.linux.fcntl;
9 import core.sys.linux.errno;
10 import core.sys.linux.netinet.tcp;
11 import core.sys.posix.netinet.in_;
12 import core.sys.posix.sys.socket;
13 import core.sys.posix.unistd;
14 
15 import std.conv : emplace;
16 import std.typecons : BitFlags;
17 
18 nothrow @nogc:
19 
20 enum ushort PORT = 12345;
21 enum SOCK_NONBLOCK = 0x800;
22 enum MAX_CLIENTS = 1000;
23 enum BUF_SIZE = 1024;
24 
25 alias IOBuffer = ubyte[BUF_SIZE];
26 FixedPool!(IOBuffer.sizeof, MAX_CLIENTS * 2, IOBuffer) bpool; // separate buffers for read/write op
27 FixedPool!(ClientContext.sizeof, MAX_CLIENTS, ClientContext) cpool;
28 int totalClients;
29 
30 extern (C) int main()
31 {
32     Uring io;
33     auto ret = io.setup(2*MAX_CLIENTS);
34     if (ret < 0)
35     {
36         fprintf(stderr, "Error initializing io_uring: %d\n", -ret);
37         return ret;
38     }
39 
40     // preallocate io buffer used for read/write operations
41     enum total = MAX_CLIENTS * BUF_SIZE * 2;
42     auto buf = cast(ubyte*)malloc(total);
43     if (buf is null)
44     {
45         fprintf(stderr, "Failed to create io buffer: %d\n", errno);
46         return -errno;
47     }
48 
49     // register it to io_uring
50     ret = io.registerBuffers(buf[0..total]);
51     if (ret != 0)
52     {
53         fprintf(stderr, "Failed to register buffers: %d\n", -ret);
54         return ret;
55     }
56 
57     // init memory pool over the registered buffer
58     bpool = fixedPool!(IOBuffer, MAX_CLIENTS*2)(buf[0..MAX_CLIENTS * BUF_SIZE * 2]);
59 
60     ret = io.initServer(PORT);
61     if (ret != 0)
62     {
63         fprintf(stderr, "Failed to init server: %d\n", -ret);
64         return ret;
65     }
66 
67     printf("Server is listening on %d\n", PORT);
68 
69     // run event loop
70     while (true)
71     {
72         ret = io.wait(1);
73         if (ret < 0)
74         {
75             fprintf(stderr, "Error waiting for completions: %d\n", ret);
76             return ret;
77         }
78 
79         // handle completed operation
80         auto ctx = cast(IOContext*)cast(void*)io.front.user_data;
81         // debug printf("op %d done, res=%d, ctx=%p\n", ctx.op, io.front.res, cast(void*)io.front.user_data);
82         final switch (ctx.op)
83         {
84             case OP.listen:
85                 if (io.front.res < 0) return io.front.res; // error with poll
86                 assert(io.front.res > 0);
87                 ret = io.onAccept(*ctx); // accept new client
88                 break;
89             case OP.read:
90                 ret = io.onRead(*ctx, io.front.res);
91                 break;
92             case OP.write:
93                 ret = io.onWrite(*ctx, io.front.res);
94                 break;
95         }
96         if (ret < 0)
97         {
98             fprintf(stderr, "Error handling op %d: %d\n", ctx.op, -ret);
99             return ret;
100         }
101         io.popFront();
102     }
103 }
104 
105 int initServer(ref Uring ring, ushort port)
106 {
107     int listenFd;
108     sockaddr_in serverAddr;
109 
110     serverAddr.sin_family = AF_INET;
111     serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
112     serverAddr.sin_port = htons(port);
113 
114     if ((listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) == -1)
115     {
116         fprintf(stderr, "socket error : %d ...\n", errno);
117         return -errno;
118     }
119 
120     int flags = 1;
121     setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, cast(void*)&flags, int.sizeof);
122     setsockopt(listenFd, IPPROTO_TCP, TCP_NODELAY, cast(void*)&flags, int.sizeof);
123 
124     if (bind(listenFd, cast(sockaddr*)&serverAddr, sockaddr.sizeof) == -1)
125     {
126         fprintf(stderr, "bind error: %d\n", errno);
127         return -errno;
128     }
129 
130     if (listen(listenFd, 32) == -1)
131     {
132         fprintf(stderr, "listen error: %d\n", errno);
133         return -errno;
134     }
135 
136     auto ctx = cast(IOContext*)malloc(IOContext.sizeof);
137     ctx.emplace(OP.listen, listenFd);
138 
139     return ring.acceptNext(*ctx);
140 }
141 
142 // resubmit poll on listening socket
143 int acceptNext(ref Uring ring, ref IOContext ctx)
144 {
145     // poll for clients - TODO: use accept operation in Linux 5.5
146     auto ret = ring.putWith!((ref SubmissionEntry e, ref IOContext ctx)
147         {
148             e.prepPollAdd(ctx.fd, PollEvents.IN);
149             e.setUserData(ctx);
150         })(ctx)
151         .submit();
152 
153     if (ret != 1)
154     {
155         fprintf(stderr, "accept(): submit error: %d\n", ret);
156         return ret;
157     }
158     return 0;
159 }
160 
161 // read next batch from the client
162 int readNext(ref Uring ring, ref ClientContext ctx)
163 {
164     ctx.readCtx.buffer = ctx.buffers[0];
165     ctx.state |= ClientState.waitRead;
166     auto ret = ring.putWith!((ref SubmissionEntry e, ref ClientContext ctx)
167         {
168             e.prepReadFixed(ctx.readCtx.fd, 0, ctx.readCtx.buffer, 0);
169             e.setUserData(ctx.readCtx);
170         })(ctx)
171         .submit();
172 
173     if (ret != 1)
174     {
175         fprintf(stderr, "read(): submit error: %d\n", ret);
176         return ret;
177     }
178     return 0;
179 }
180 
181 // echo back what was read
182 int writeNext(ref Uring ring, ref ClientContext ctx)
183 {
184     ctx.writeCtx.buffer = ctx.buffers[1][0..ctx.lastRead];
185     ctx.state |= ClientState.waitWrite;
186 
187     auto ret = ring.putWith!((ref SubmissionEntry e, ref ClientContext ctx)
188         {
189             e.prepWriteFixed(ctx.writeCtx.fd, 0, ctx.writeCtx.buffer, 0);
190             e.setUserData(ctx.writeCtx);
191         })(ctx)
192         .submit();
193 
194     if (ret != 1)
195     {
196         fprintf(stderr, "write(): submit error: %d\n", ret);
197         return ret;
198     }
199     return 0;
200 }
201 
202 // accept new client
203 int onAccept(ref Uring ring, ref IOContext ioctx)
204 {
205     sockaddr_in addr;
206     socklen_t len;
207     int cfd = accept(ioctx.fd, cast(sockaddr*)&addr, &len);
208 
209     if (cfd == -1)
210     {
211         fprintf(stderr, "accept(): %d\n", errno);
212         return -errno;
213     }
214 
215     // prepare client context
216     auto ctx = cpool.alloc();
217     if (ctx is null)
218     {
219         fprintf(stderr, "Clients limit reached\n");
220         close(cfd);
221     }
222     else
223     {
224         totalClients++;
225         printf("accepted clientfd %d, total=%d\n", cfd, totalClients);
226         // setup read operation
227         ctx.buffers[0] = (*bpool.alloc())[];
228         ctx.buffers[1] = (*bpool.alloc())[];
229         ctx.readCtx.fd = ctx.writeCtx.fd = cfd;
230         ctx.readCtx.op = OP.read;
231         ctx.writeCtx.op = OP.write;
232         assert(ctx.buffers[0] !is null && ctx.buffers[1] !is null);
233         auto ret = ring.readNext(*ctx);
234         if (ret < 0) return ret;
235     }
236 
237     return ring.acceptNext(ioctx);
238 }
239 
240 int onRead(ref Uring ring, ref IOContext ioctx, int len)
241 {
242     auto ctx = cast(ClientContext*)(cast(void*)&ioctx - ClientContext.readCtx.offsetof);
243     ctx.state &= ~ClientState.waitRead;
244     ctx.lastRead = len;
245 
246     if (len == 0)
247     {
248         if (!(ctx.state & ClientState.waitWrite)) closeClient(ctx);
249     }
250     else if (!(ctx.state & ClientState.waitWrite))
251     {
252         // we can echo back what was read and start reading new batch
253         ctx.swapBuffers();
254         auto ret = ring.readNext(*ctx);
255         if (ret != 0) return ret;
256         ret = ring.writeNext(*ctx);
257         if (ret != 0) return ret;
258     }
259     return 0;
260 }
261 
262 int onWrite(ref Uring ring, ref IOContext ioctx, int len)
263 {
264     auto ctx = cast(ClientContext*)(cast(void*)&ioctx - ClientContext.writeCtx.offsetof);
265     ctx.state &= ~ClientState.waitWrite;
266 
267     if (!(ctx.state & ClientState.waitRead))
268     {
269         if (ctx.lastRead == 0)
270         {
271             closeClient(ctx);
272         }
273         else
274         {
275             // we can echo back what was read and start reading new batch
276             ctx.swapBuffers();
277             auto ret = ring.readNext(*ctx);
278             if (ret != 0) return ret;
279             ret = ring.writeNext(*ctx);
280             if (ret != 0) return ret;
281         }
282     }
283 
284     return 0;
285 }
286 
287 // cleanup client resources
288 void closeClient(ClientContext* ctx)
289 {
290     totalClients--;
291     printf("%d: closing, total=%d\n", ctx.readCtx.fd, totalClients);
292     close(ctx.readCtx.fd);
293     bpool.dealloc(cast(IOBuffer*)&ctx.buffers[0][0]);
294     bpool.dealloc(cast(IOBuffer*)&ctx.buffers[1][0]);
295     cpool.dealloc(ctx);
296 }
297 
298 enum OP
299 {
300     listen = 1,
301     read = 2,
302     write = 3
303 }
304 
305 struct IOContext
306 {
307     OP op;
308     int fd;
309     ubyte[] buffer;
310 }
311 
312 enum ClientState
313 {
314     init_ = 0,
315     waitRead = 1,
316     waitWrite = 2,
317 }
318 
319 struct ClientContext
320 {
321     BitFlags!ClientState state;
322     ubyte[][2] buffers; // read/write buffers
323     IOContext readCtx;
324     IOContext writeCtx;
325     int lastRead;
326 
327     void swapBuffers() nothrow @nogc
328     {
329         ubyte* tmp = &buffers[0][0];
330         buffers[0] = (&buffers[1][0])[0..BUF_SIZE];
331         buffers[1] = tmp[0..BUF_SIZE];
332     }
333 }