1 module echo_server.app; 2 3 import during; 4 import mempooled.fixed; 5 6 import core.stdc.stdio; 7 import core.stdc.stdlib; 8 import core.sys.linux.fcntl; 9 import core.sys.linux.errno; 10 import core.sys.linux.netinet.tcp; 11 import core.sys.posix.netinet.in_; 12 import core.sys.posix.sys.socket; 13 import core.sys.posix.unistd; 14 15 import std.conv : emplace; 16 import std.typecons : BitFlags; 17 18 nothrow @nogc: 19 20 enum ushort PORT = 12345; 21 enum SOCK_NONBLOCK = 0x800; 22 enum MAX_CLIENTS = 1000; 23 enum BUF_SIZE = 1024; 24 25 alias IOBuffer = ubyte[BUF_SIZE]; 26 FixedPool!(IOBuffer.sizeof, MAX_CLIENTS * 2, IOBuffer) bpool; // separate buffers for read/write op 27 FixedPool!(ClientContext.sizeof, MAX_CLIENTS, ClientContext) cpool; 28 int totalClients; 29 30 extern (C) int main() 31 { 32 Uring io; 33 auto ret = io.setup(2*MAX_CLIENTS); 34 if (ret < 0) 35 { 36 fprintf(stderr, "Error initializing io_uring: %d\n", -ret); 37 return ret; 38 } 39 40 // preallocate io buffer used for read/write operations 41 enum total = MAX_CLIENTS * BUF_SIZE * 2; 42 auto buf = cast(ubyte*)malloc(total); 43 if (buf is null) 44 { 45 fprintf(stderr, "Failed to create io buffer: %d\n", errno); 46 return -errno; 47 } 48 49 // register it to io_uring 50 ret = io.registerBuffers(buf[0..total]); 51 if (ret != 0) 52 { 53 fprintf(stderr, "Failed to register buffers: %d\n", -ret); 54 return ret; 55 } 56 57 // init memory pool over the registered buffer 58 bpool = fixedPool!(IOBuffer, MAX_CLIENTS*2)(buf[0..MAX_CLIENTS * BUF_SIZE * 2]); 59 60 ret = io.initServer(PORT); 61 if (ret != 0) 62 { 63 fprintf(stderr, "Failed to init server: %d\n", -ret); 64 return ret; 65 } 66 67 printf("Server is listening on %d\n", PORT); 68 69 // run event loop 70 while (true) 71 { 72 ret = io.wait(1); 73 if (ret < 0) 74 { 75 fprintf(stderr, "Error waiting for completions: %d\n", ret); 76 return ret; 77 } 78 79 // handle completed operation 80 auto ctx = cast(IOContext*)cast(void*)io.front.user_data; 81 // debug printf("op %d done, res=%d, ctx=%p\n", ctx.op, io.front.res, cast(void*)io.front.user_data); 82 final switch (ctx.op) 83 { 84 case OP.listen: 85 if (io.front.res < 0) return io.front.res; // error with poll 86 assert(io.front.res > 0); 87 ret = io.onAccept(*ctx); // accept new client 88 break; 89 case OP.read: 90 ret = io.onRead(*ctx, io.front.res); 91 break; 92 case OP.write: 93 ret = io.onWrite(*ctx, io.front.res); 94 break; 95 } 96 if (ret < 0) 97 { 98 fprintf(stderr, "Error handling op %d: %d\n", ctx.op, -ret); 99 return ret; 100 } 101 io.popFront(); 102 } 103 } 104 105 int initServer(ref Uring ring, ushort port) 106 { 107 int listenFd; 108 sockaddr_in serverAddr; 109 110 serverAddr.sin_family = AF_INET; 111 serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); 112 serverAddr.sin_port = htons(port); 113 114 if ((listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) == -1) 115 { 116 fprintf(stderr, "socket error : %d ...\n", errno); 117 return -errno; 118 } 119 120 int flags = 1; 121 setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, cast(void*)&flags, int.sizeof); 122 setsockopt(listenFd, IPPROTO_TCP, TCP_NODELAY, cast(void*)&flags, int.sizeof); 123 124 if (bind(listenFd, cast(sockaddr*)&serverAddr, sockaddr.sizeof) == -1) 125 { 126 fprintf(stderr, "bind error: %d\n", errno); 127 return -errno; 128 } 129 130 if (listen(listenFd, 32) == -1) 131 { 132 fprintf(stderr, "listen error: %d\n", errno); 133 return -errno; 134 } 135 136 auto ctx = cast(IOContext*)malloc(IOContext.sizeof); 137 ctx.emplace(OP.listen, listenFd); 138 139 return ring.acceptNext(*ctx); 140 } 141 142 // resubmit poll on listening socket 143 int acceptNext(ref Uring ring, ref IOContext ctx) 144 { 145 // poll for clients - TODO: use accept operation in Linux 5.5 146 auto ret = ring.putWith!((ref SubmissionEntry e, ref IOContext ctx) 147 { 148 e.prepPollAdd(ctx.fd, PollEvents.IN); 149 e.setUserData(ctx); 150 })(ctx) 151 .submit(); 152 153 if (ret != 1) 154 { 155 fprintf(stderr, "accept(): submit error: %d\n", ret); 156 return ret; 157 } 158 return 0; 159 } 160 161 // read next batch from the client 162 int readNext(ref Uring ring, ref ClientContext ctx) 163 { 164 ctx.readCtx.buffer = ctx.buffers[0]; 165 ctx.state |= ClientState.waitRead; 166 auto ret = ring.putWith!((ref SubmissionEntry e, ref ClientContext ctx) 167 { 168 e.prepReadFixed(ctx.readCtx.fd, 0, ctx.readCtx.buffer, 0); 169 e.setUserData(ctx.readCtx); 170 })(ctx) 171 .submit(); 172 173 if (ret != 1) 174 { 175 fprintf(stderr, "read(): submit error: %d\n", ret); 176 return ret; 177 } 178 return 0; 179 } 180 181 // echo back what was read 182 int writeNext(ref Uring ring, ref ClientContext ctx) 183 { 184 ctx.writeCtx.buffer = ctx.buffers[1][0..ctx.lastRead]; 185 ctx.state |= ClientState.waitWrite; 186 187 auto ret = ring.putWith!((ref SubmissionEntry e, ref ClientContext ctx) 188 { 189 e.prepWriteFixed(ctx.writeCtx.fd, 0, ctx.writeCtx.buffer, 0); 190 e.setUserData(ctx.writeCtx); 191 })(ctx) 192 .submit(); 193 194 if (ret != 1) 195 { 196 fprintf(stderr, "write(): submit error: %d\n", ret); 197 return ret; 198 } 199 return 0; 200 } 201 202 // accept new client 203 int onAccept(ref Uring ring, ref IOContext ioctx) 204 { 205 sockaddr_in addr; 206 socklen_t len; 207 int cfd = accept(ioctx.fd, cast(sockaddr*)&addr, &len); 208 209 if (cfd == -1) 210 { 211 fprintf(stderr, "accept(): %d\n", errno); 212 return -errno; 213 } 214 215 // prepare client context 216 auto ctx = cpool.alloc(); 217 if (ctx is null) 218 { 219 fprintf(stderr, "Clients limit reached\n"); 220 close(cfd); 221 } 222 else 223 { 224 totalClients++; 225 printf("accepted clientfd %d, total=%d\n", cfd, totalClients); 226 // setup read operation 227 ctx.buffers[0] = (*bpool.alloc())[]; 228 ctx.buffers[1] = (*bpool.alloc())[]; 229 ctx.readCtx.fd = ctx.writeCtx.fd = cfd; 230 ctx.readCtx.op = OP.read; 231 ctx.writeCtx.op = OP.write; 232 assert(ctx.buffers[0] !is null && ctx.buffers[1] !is null); 233 auto ret = ring.readNext(*ctx); 234 if (ret < 0) return ret; 235 } 236 237 return ring.acceptNext(ioctx); 238 } 239 240 int onRead(ref Uring ring, ref IOContext ioctx, int len) 241 { 242 auto ctx = cast(ClientContext*)(cast(void*)&ioctx - ClientContext.readCtx.offsetof); 243 ctx.state &= ~ClientState.waitRead; 244 ctx.lastRead = len; 245 246 if (len == 0) 247 { 248 if (!(ctx.state & ClientState.waitWrite)) closeClient(ctx); 249 } 250 else if (!(ctx.state & ClientState.waitWrite)) 251 { 252 // we can echo back what was read and start reading new batch 253 ctx.swapBuffers(); 254 auto ret = ring.readNext(*ctx); 255 if (ret != 0) return ret; 256 ret = ring.writeNext(*ctx); 257 if (ret != 0) return ret; 258 } 259 return 0; 260 } 261 262 int onWrite(ref Uring ring, ref IOContext ioctx, int len) 263 { 264 auto ctx = cast(ClientContext*)(cast(void*)&ioctx - ClientContext.writeCtx.offsetof); 265 ctx.state &= ~ClientState.waitWrite; 266 267 if (!(ctx.state & ClientState.waitRead)) 268 { 269 if (ctx.lastRead == 0) 270 { 271 closeClient(ctx); 272 } 273 else 274 { 275 // we can echo back what was read and start reading new batch 276 ctx.swapBuffers(); 277 auto ret = ring.readNext(*ctx); 278 if (ret != 0) return ret; 279 ret = ring.writeNext(*ctx); 280 if (ret != 0) return ret; 281 } 282 } 283 284 return 0; 285 } 286 287 // cleanup client resources 288 void closeClient(ClientContext* ctx) 289 { 290 totalClients--; 291 printf("%d: closing, total=%d\n", ctx.readCtx.fd, totalClients); 292 close(ctx.readCtx.fd); 293 bpool.dealloc(cast(IOBuffer*)&ctx.buffers[0][0]); 294 bpool.dealloc(cast(IOBuffer*)&ctx.buffers[1][0]); 295 cpool.dealloc(ctx); 296 } 297 298 enum OP 299 { 300 listen = 1, 301 read = 2, 302 write = 3 303 } 304 305 struct IOContext 306 { 307 OP op; 308 int fd; 309 ubyte[] buffer; 310 } 311 312 enum ClientState 313 { 314 init_ = 0, 315 waitRead = 1, 316 waitWrite = 2, 317 } 318 319 struct ClientContext 320 { 321 BitFlags!ClientState state; 322 ubyte[][2] buffers; // read/write buffers 323 IOContext readCtx; 324 IOContext writeCtx; 325 int lastRead; 326 327 void swapBuffers() nothrow @nogc 328 { 329 ubyte* tmp = &buffers[0][0]; 330 buffers[0] = (&buffers[1][0])[0..BUF_SIZE]; 331 buffers[1] = tmp[0..BUF_SIZE]; 332 } 333 }