1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version (linux) {} 8 else static assert(0, "io_uring is available on linux only"); 9 10 public import during.io_uring; 11 12 import core.atomic : MemoryOrder; 13 debug import core.stdc.stdio; 14 import core.stdc.stdlib; 15 import core.sys.linux.errno; 16 import core.sys.linux.sys.mman; 17 import core.sys.linux.unistd; 18 import core.sys.posix.signal; 19 import core.sys.posix.sys.socket; 20 import core.sys.posix.sys.uio; 21 import std.algorithm.comparison : among; 22 23 @trusted: //TODO: remove this and use selectively where it makes sense 24 nothrow @nogc: 25 26 /** 27 * Setup new instance of io_uring into provided `Uring` structure. 28 * 29 * Params: 30 * uring = `Uring` structure to be initialized (must not be already initialized) 31 * entries = Number of entries to initialize uring with 32 * flags = `SetupFlags` to use to initialize uring. 33 * 34 * Returns: On succes it returns 0, `-errno` otherwise. 35 */ 36 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) 37 { 38 assert(uring.payload is null, "Uring is already initialized"); 39 uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof); 40 if (uring.payload is null) return -errno; 41 42 uring.payload.params.flags = flags; 43 uring.payload.refs = 1; 44 auto r = io_uring_setup(entries, uring.payload.params); 45 if (r < 0) return -errno; 46 47 uring.payload.fd = r; 48 49 if (uring.payload.mapRings() < 0) 50 { 51 dispose(uring); 52 return -errno; 53 } 54 55 // debug printf("uring(%d): setup\n", uring.payload.fd); 56 57 return 0; 58 } 59 60 /** 61 * Main entry point to work with io_uring. 62 * 63 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 64 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 65 * 66 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 67 * 68 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 69 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 70 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 71 * reused for multiple operations. 72 */ 73 struct Uring 74 { 75 nothrow @nogc: 76 77 private UringDesc* payload; 78 private void checkInitialized() const 79 { 80 assert(payload !is null, "Uring hasn't been initialized yet"); 81 } 82 83 /// Copy constructor 84 this(ref return scope Uring rhs) 85 { 86 assert(rhs.payload !is null, "rhs payload is null"); 87 // debug printf("uring(%d): copy\n", rhs.payload.fd); 88 this.payload = rhs.payload; 89 this.payload.refs++; 90 } 91 92 /// Destructor 93 ~this() 94 { 95 dispose(this); 96 } 97 98 /// Native io_uring file descriptor 99 auto fd() const 100 { 101 checkInitialized(); 102 return payload.fd; 103 } 104 105 /// io_uring parameters 106 SetupParameters params() const return 107 { 108 checkInitialized(); 109 return payload.params; 110 } 111 112 /// Check if there is some `CompletionEntry` to process. 113 bool empty() const 114 { 115 checkInitialized(); 116 return payload.cq.empty; 117 } 118 119 /// Check if there is space for another `SubmissionEntry` to submit. 120 bool full() const 121 { 122 checkInitialized(); 123 return payload.sq.full; 124 } 125 126 /// Available space in submission queue before it becomes full 127 size_t capacity() const 128 { 129 checkInitialized(); 130 return payload.sq.capacity; 131 } 132 133 /// Number of entries in completion queue 134 size_t length() const 135 { 136 checkInitialized(); 137 return payload.cq.length; 138 } 139 140 /// Get first `CompletionEntry` from cq ring 141 ref CompletionEntry front() return 142 { 143 checkInitialized(); 144 return payload.cq.front; 145 } 146 147 /// Move to next `CompletionEntry` 148 void popFront() 149 { 150 checkInitialized(); 151 return payload.cq.popFront; 152 } 153 154 /** 155 * Adds new entry to the `SubmissionQueue`. 156 * 157 * Note that this just adds entry to the queue and doesn't advance the tail 158 * marker kernel sees. For that `finishSq()` is needed to be called next. 159 * 160 * Also note that to actually enter new entries to kernel, 161 * it's needed to call `submit()`. 162 * 163 * Params: 164 * FN = Function to fill next entry in queue by `ref` (should be faster). 165 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 166 * Note that in this case queue entry is cleaned first before function is called. 167 * entry = Custom built `SubmissionEntry` to be posted as is. 168 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 169 * args = Optional arguments passed to the function 170 * 171 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 172 */ 173 ref Uring put()(auto ref SubmissionEntry entry) return 174 { 175 checkInitialized(); 176 payload.sq.put(entry); 177 return this; 178 } 179 180 /// ditto 181 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 182 { 183 import std.functional : forward; 184 checkInitialized(); 185 payload.sq.putWith!FN(forward!args); 186 return this; 187 } 188 189 /** 190 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 191 * to next `SubmissionEntry` in queue. 192 * 193 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 194 * 195 * Params: 196 * op = Custom operation definition. 197 * Returns: 198 */ 199 ref Uring put(OP)(auto ref OP op) return 200 if (!is(OP == SubmissionEntry)) 201 { 202 checkInitialized(); 203 payload.sq.put(op); 204 return this; 205 } 206 207 /** 208 * If completion queue is full, the new event maybe dropped. 209 * This value records number of dropped events. 210 */ 211 uint overflow() const 212 { 213 checkInitialized(); 214 return payload.cq.overflow; 215 } 216 217 /// Counter of invalid submissions (out-of-bound index in submission array) 218 uint dropped() const 219 { 220 checkInitialized(); 221 return payload.sq.dropped; 222 } 223 224 /** 225 * Submits qued `SubmissionEntry` to be processed by kernel. 226 * 227 * Params: 228 * want = number of `CompletionEntries` to wait for. 229 * If 0, this just submits queued entries and returns. 230 * If > 0, it blocks until at least wanted number of entries were completed. 231 * sig = See io_uring_enter(2) man page 232 * 233 * Returns: Number of submitted entries on success, `-errno` on error 234 */ 235 auto submit(uint want = 0, const sigset_t* sig = null) @trusted 236 { 237 checkInitialized(); 238 239 auto len = cast(uint)payload.sq.length; 240 if (len > 0) // anything to submit? 241 { 242 EnterFlags flags; 243 if (want > 0) flags |= EnterFlags.GETEVENTS; 244 245 payload.sq.flushTail(); // advance queue index 246 247 if (payload.params.flags & SetupFlags.SQPOLL) 248 { 249 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP) 250 flags |= EnterFlags.SQ_WAKEUP; 251 else if (want == 0) return len; // fast poll 252 } 253 auto r = io_uring_enter(payload.fd, len, want, flags, sig); 254 if (r < 0) return -errno; 255 return r; 256 } 257 else if (want > 0) return wait(want); // just simple wait 258 return 0; 259 } 260 261 /** 262 * Simmilar to `submit` but with this method we just wait for required number 263 * of `CompletionEntries`. 264 * 265 * Returns: `0` on success, `-errno` on error 266 */ 267 auto wait(uint want = 1, const sigset_t* sig = null) @trusted 268 { 269 pragma(inline); 270 checkInitialized(); 271 assert(want > 0, "Invalid want value"); 272 273 if (payload.cq.length >= want) return 0; // we don't need to syscall 274 275 auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig); 276 if (r < 0) return -errno; 277 return 0; 278 } 279 280 /** 281 * Register single buffer to be mapped into the kernel for faster buffered operations. 282 * 283 * To use the buffers, the application must specify the fixed variants for of operations, 284 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 285 * in entry extra data. 286 * 287 * An application can increase or decrease the size or number of registered buffers by first 288 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 289 * the new buffers. 290 * 291 * Params: 292 * buffer = Buffers to be registered 293 * 294 * Returns: On success, returns 0. On error, `-errno` is returned. 295 */ 296 auto registerBuffers(T)(T buffers) @trusted 297 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 298 { 299 checkInitialized(); 300 assert(buffers.length, "Empty buffer"); 301 302 if (payload.regBuffers !is null) 303 return -EBUSY; // buffers were already registered 304 305 static if (is(T == ubyte[])) 306 { 307 auto p = malloc(iovec.sizeof); 308 if (p is null) return -errno; 309 payload.regBuffers = (cast(iovec*)p)[0..1]; 310 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 311 payload.regBuffers[0].iov_len = buffers.length; 312 } 313 else static if (is(T == ubyte[][])) 314 { 315 auto p = malloc(buffers.length * iovec.sizeof); 316 if (p is null) return -errno; 317 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 318 319 foreach (i, b; buffers) 320 { 321 assert(b.length, "Empty buffer"); 322 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 323 payload.regBuffers[i].iov_len = b.length; 324 } 325 } 326 327 auto r = io_uring_register( 328 payload.fd, 329 RegisterOpCode.REGISTER_BUFFERS, 330 cast(const(void)*)&payload.regBuffers[0], 1 331 ); 332 333 if (r < 0) return -errno; 334 return 0; 335 } 336 337 /** 338 * Releases all previously registered buffers associated with the `io_uring` instance. 339 * 340 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 341 * 342 * Returns: On success, returns 0. On error, `-errno` is returned. 343 */ 344 auto unregisterBuffers() @trusted 345 { 346 checkInitialized(); 347 348 if (payload.regBuffers is null) 349 return -ENXIO; // no buffers were registered 350 351 free(cast(void*)&payload.regBuffers[0]); 352 payload.regBuffers = null; 353 354 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 355 if (r < 0) return -errno; 356 return 0; 357 } 358 359 /** 360 * Register files for I/O. 361 * 362 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 363 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 364 * file descriptor array. 365 * 366 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 367 * need only unregister if it wishes to register a new set of fds. 368 * 369 * Use `-1` as a file descriptor to mark it as reserved in the array.* 370 * Params: fds = array of file descriptors to be registered 371 * 372 * Returns: On success, returns 0. On error, `-errno` is returned. 373 */ 374 auto registerFiles(const(int)[] fds) @trusted 375 { 376 checkInitialized(); 377 assert(fds.length, "No file descriptors provided"); 378 assert(fds.length < uint.max, "Too many file descriptors"); 379 380 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 381 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 382 if (r < 0) return -errno; 383 return 0; 384 } 385 386 /* 387 * Register an update for an existing file set. The updates will start at 388 * `off` in the original array. 389 * 390 * Use `-1` as a file descriptor to mark it as reserved in the array. 391 * 392 * Params: 393 * off = offset to the original registered files to be updated 394 * files = array of file descriptors to update with 395 * 396 * Returns: number of files updated on success, -errno on failure. 397 */ 398 auto registerFilesUpdate(uint off, const(int)[] fds) @trusted 399 { 400 struct Update 401 { 402 uint off; 403 const(int)* fds; 404 } 405 406 checkInitialized(); 407 assert(fds.length, "No file descriptors provided to update"); 408 assert(fds.length < uint.max, "Too many file descriptors"); 409 410 Update u = Update(off, &fds[0]); 411 auto r = io_uring_register( 412 payload.fd, 413 RegisterOpCode.REGISTER_FILES_UPDATE, 414 &u, cast(uint)fds.length); 415 if (r < 0) return -errno; 416 return 0; 417 } 418 419 /** 420 * All previously registered files associated with the `io_uring` instance will be unregistered. 421 * 422 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 423 * need only unregister if it wishes to register a new set of fds. 424 * 425 * Returns: On success, returns 0. On error, `-errno` is returned. 426 */ 427 auto unregisterFiles() @trusted 428 { 429 checkInitialized(); 430 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 431 if (r < 0) return -errno; 432 return 0; 433 } 434 435 /** 436 * TODO 437 * 438 * Params: eventFD = TODO 439 * 440 * Returns: On success, returns 0. On error, `-errno` is returned. 441 */ 442 auto registerEventFD(int eventFD) @trusted 443 { 444 checkInitialized(); 445 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 446 if (r < 0) return -errno; 447 return 0; 448 } 449 450 /** 451 * TODO 452 * 453 * Returns: On success, returns 0. On error, `-errno` is returned. 454 */ 455 auto unregisterEventFD() @trusted 456 { 457 checkInitialized(); 458 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 459 if (r < 0) return -errno; 460 return 0; 461 } 462 } 463 464 /** 465 * Uses custom operation definition to fill fields of `SubmissionEntry`. 466 * Can be used in cases, when builtin prep* functions aren't enough. 467 * 468 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 469 * 470 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 471 * So it might be needed to call `clear` first on the entry (depends on usage). 472 * 473 * Params: 474 * entry = entry to set parameters to 475 * op = operation to fill entry with (can be custom type) 476 */ 477 void fill(E)(ref SubmissionEntry entry, auto ref E op) 478 { 479 pragma(inline); 480 import std.traits : hasMember, FieldNameTuple; 481 482 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 483 foreach (m; FieldNameTuple!E) 484 { 485 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 486 __traits(getMember, entry, m) = __traits(getMember, op, m); 487 } 488 } 489 490 /** 491 * Template function to help set `SubmissionEntry` `user_data` field. 492 * 493 * Params: 494 * entry = `SubmissionEntry` to prepare 495 * data = data to set to the `SubmissionEntry` 496 * 497 * Note: data are passed by ref and must live during whole operation. 498 */ 499 void setUserData(D)(ref SubmissionEntry entry, ref D data) @trusted 500 { 501 entry.user_data = cast(ulong)(cast(void*)&data); 502 } 503 504 /** 505 * Prepares `nop` operation. 506 * 507 * Params: 508 * entry = `SubmissionEntry` to prepare 509 */ 510 void prepNop(ref SubmissionEntry entry) @safe 511 { 512 entry.opcode = Operation.NOP; 513 entry.fd = -1; 514 } 515 516 // TODO: check offset behavior, preadv2/pwritev2 should accept -1 to work on the current file offset, 517 // but it doesn't seem to work here. 518 519 /** 520 * Prepares `readv` operation. 521 * 522 * Params: 523 * entry = `SubmissionEntry` to prepare 524 * fd = file descriptor of file we are operating on 525 * offset = offset 526 * buffer = iovec buffers to be used by the operation 527 */ 528 void prepReadv(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset) @trusted 529 if (is(V == iovec[]) || is(V == iovec)) 530 { 531 entry.opcode = Operation.READV; 532 entry.fd = fd; 533 entry.off = offset; 534 static if (is(V == iovec[])) 535 { 536 assert(buffer.length, "Empty buffer"); 537 assert(buffer.length < uint.max, "Too many iovec buffers"); 538 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 539 entry.len = cast(uint)buffer.length; 540 } 541 else 542 { 543 entry.addr = cast(ulong)(cast(void*)&buffer); 544 entry.len = 1; 545 } 546 } 547 548 /** 549 * Prepares `writev` operation. 550 * 551 * Params: 552 * entry = `SubmissionEntry` to prepare 553 * fd = file descriptor of file we are operating on 554 * offset = offset 555 * buffer = iovec buffers to be used by the operation 556 */ 557 void prepWritev(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset) @trusted 558 if (is(V == iovec[]) || is(V == iovec)) 559 { 560 entry.opcode = Operation.WRITEV; 561 entry.fd = fd; 562 entry.off = offset; 563 static if (is(V == iovec[])) 564 { 565 assert(buffer.length, "Empty buffer"); 566 assert(buffer.length < uint.max, "Too many iovec buffers"); 567 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 568 entry.len = cast(uint)buffer.length; 569 } 570 else 571 { 572 entry.addr = cast(ulong)(cast(void*)&buffer); 573 entry.len = 1; 574 } 575 } 576 577 /** 578 * Prepares `read_fixed` operation. 579 * 580 * Params: 581 * entry = `SubmissionEntry` to prepare 582 * fd = file descriptor of file we are operating on 583 * offset = offset 584 * buffer = slice to preregistered buffer 585 * bufferIndex = index to the preregistered buffers array buffer belongs to 586 */ 587 void prepReadFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex) @trusted 588 { 589 assert(buffer.length, "Empty buffer"); 590 assert(buffer.length < uint.max, "Buffer too large"); 591 entry.opcode = Operation.READ_FIXED; 592 entry.fd = fd; 593 entry.off = offset; 594 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 595 entry.len = cast(uint)buffer.length; 596 entry.buf_index = bufferIndex; 597 } 598 599 /** 600 * Prepares `write_fixed` operation. 601 * 602 * Params: 603 * entry = `SubmissionEntry` to prepare 604 * fd = file descriptor of file we are operating on 605 * offset = offset 606 * buffer = slice to preregistered buffer 607 * bufferIndex = index to the preregistered buffers array buffer belongs to 608 */ 609 void prepWriteFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex) @trusted 610 { 611 assert(buffer.length, "Empty buffer"); 612 assert(buffer.length < uint.max, "Buffer too large"); 613 entry.opcode = Operation.WRITE_FIXED; 614 entry.fd = fd; 615 entry.off = offset; 616 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 617 entry.len = cast(uint)buffer.length; 618 entry.buf_index = bufferIndex; 619 } 620 621 /** 622 * Prepares `sendmsg(2)` operation. 623 * 624 * Params: 625 * entry = `SubmissionEntry` to prepare 626 * fd = file descriptor of file we are operating on 627 * msg = message to operate with 628 * flags = `sendmsg` operation flags 629 * 630 * Note: Available from Linux 5.3 631 * 632 * See_Also: `sendmsg(2)` man page for details. 633 */ 634 void prepSendMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 635 { 636 entry.opcode = Operation.SENDMSG; 637 entry.fd = fd; 638 entry.addr = cast(ulong)(cast(void*)&msg); 639 entry.msg_flags = flags; 640 } 641 642 /** 643 * Prepares `recvmsg(2)` operation. 644 * 645 * Params: 646 * entry = `SubmissionEntry` to prepare 647 * fd = file descriptor of file we are operating on 648 * msg = message to operate with 649 * flags = `recvmsg` operation flags 650 * 651 * Note: Available from Linux 5.3 652 * 653 * See_Also: `recvmsg(2)` man page for details. 654 */ 655 void prepRecvMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 656 { 657 entry.opcode = Operation.RECVMSG; 658 entry.fd = fd; 659 entry.addr = cast(ulong)(cast(void*)&msg); 660 entry.msg_flags = flags; 661 } 662 663 /** 664 * Prepares `fsync` operation. 665 * 666 * Params: 667 * entry = `SubmissionEntry` to prepare 668 * fd = file descriptor of a file to call `fsync` on 669 * flags = `fsync` operation flags 670 */ 671 void prepFsync(ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 672 { 673 entry.opcode = Operation.FSYNC; 674 entry.fd = fd; 675 entry.fsync_flags = flags; 676 } 677 678 /** 679 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 680 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 681 * That is, once the poll operation is completed, it will have to be resubmitted. 682 * 683 * Params: 684 * entry = `SubmissionEntry` to prepare 685 * fd = file descriptor to poll 686 * events = events to poll on the FD 687 */ 688 void prepPollAdd(ref SubmissionEntry entry, int fd, PollEvents events) @safe 689 { 690 entry.opcode = Operation.POLL_ADD; 691 entry.fd = fd; 692 entry.poll_events = events; 693 } 694 695 /** 696 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 697 * `0`. If not found, res will contain `-ENOENT`. 698 * 699 * Params: 700 * entry = `SubmissionEntry` to prepare 701 * userData = data with the previously issued poll operation 702 */ 703 void prepPollRemove(D)(ref SubmissionEntry entry, ref D userData) @trusted 704 { 705 entry.opcode = Operation.POLL_REMOVE; 706 entry.fd = -1; 707 entry.addr = cast(ulong)(cast(void*)&userData); 708 } 709 710 /** 711 * Prepares `sync_file_range(2)` operation. 712 * 713 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 714 * by the file descriptor fd with disk. 715 * 716 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 717 * 718 * Params: 719 * entry = `SubmissionEntry` to prepare 720 * fd = is the file descriptor to sync 721 * offset = the starting byte of the file range to be synchronized 722 * len = the length of the range to be synchronized, in bytes 723 * flags = the flags for the command. 724 * 725 * See_Also: `sync_file_range(2)` for the general description of the related system call. 726 * 727 * Note: available from Linux 5.2 728 */ 729 void prepSyncFileRange(ref SubmissionEntry entry, int fd, ulong offset, uint len, SyncFileRangeFlags flags) @safe 730 { 731 entry.opcode = Operation.SYNC_FILE_RANGE; 732 entry.fd = fd; 733 entry.off = offset; 734 entry.len = len; 735 entry.sync_range_flags = flags; 736 } 737 738 /** 739 * This command will register a timeout operation. 740 * 741 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 742 * timeout condition is met when either the specified timeout expires, or the specified number of 743 * events have completed. Either condition will trigger the event. The request will complete with 744 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 745 * completed through requests completing on their own. If the timeout was cancelled before it 746 * expired, the request will complete with `-ECANCELED`. 747 * 748 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 749 * 750 * Params: 751 * entry = `SubmissionEntry` to prepare 752 * time = reference to `time64` data structure 753 * count = completion event count 754 * flags = define if it's a relative or absolute time 755 * 756 * Note: Available from Linux 5.4 757 */ 758 void prepTimeout(ref SubmissionEntry entry, ref KernelTimespec time, 759 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted 760 { 761 entry.opcode = Operation.TIMEOUT; 762 entry.fd = -1; 763 entry.addr = cast(ulong)(cast(void*)&time); 764 entry.len = 1; 765 entry.off = count; 766 entry.timeout_flags = flags; 767 } 768 769 /** 770 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 771 * 772 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 773 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 774 * timeout request was found but expiration was already in progress, this request will terminate 775 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 776 * terminate with a result value of `-ENOENT`. 777 * 778 * Params: 779 * entry = `SubmissionEntry` to prepare 780 * userData = user data provided with the previously issued timeout operation 781 * 782 * Note: Available from Linux 5.5 783 */ 784 void prepTimeoutRemove(D)(ref SubmissionEntry entry, ref D userData) @trusted 785 { 786 entry.opcode = Operation.TIMEOUT_REMOVE; 787 entry.fd = -1; 788 entry.addr = cast(ulong)(cast(void*)&userData); 789 } 790 791 /** 792 * Prepares `accept4(2)` operation. 793 * 794 * See_Also: `accept4(2)`` for the general description of the related system call. 795 * 796 * Params: 797 * entry = `SubmissionEntry` to prepare 798 * fd = socket file descriptor 799 * addr = reference to one of sockaddr structires to be filled with accepted client address 800 * addrlen = reference to addrlen field that would be filled with accepted client address length 801 * 802 * Note: Available from Linux 5.5 803 */ 804 void prepAccept(ADDR)(ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 805 AcceptFlags flags = AcceptFlags.NONE) @trusted 806 { 807 entry.opcode = Operation.ACCEPT; 808 entry.fd = fd; 809 entry.addr = cast(ulong)(cast(void*)&addr); 810 entry.addr2 = cast(ulong)(cast(void*)&addrlen); 811 entry.accept_flags = flags; 812 } 813 814 /** 815 * Prepares operation that cancels existing async work. 816 * 817 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 818 * distinction to make here with the different kinds of commands. A read/write on a regular file 819 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 820 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 821 * these operations if they haven’t yet been started. If they have been started, cancellations on 822 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 823 * at any time. The completion event for this request will have a result of 0 if done successfully, 824 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 825 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 826 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 827 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 828 * possible. 829 * 830 * Params: 831 * entry = `SubmissionEntry` to prepare 832 * userData = `user_data` field of the request that should be cancelled 833 * 834 * Note: Available from Linux 5.5 835 */ 836 void prepCancel(D)(ref SubmissionEntry entry, ref D userData/*, uint flags = 0*/) @trusted 837 { 838 entry.opcode = Operation.ASYNC_CANCEL; 839 entry.fd = -1; 840 entry.addr = cast(ulong)(cast(void*)&userData); 841 // entry.cancel_flags = flags; // TODO: there are none yet 842 } 843 844 /** 845 * Prepares linked timeout operation. 846 * 847 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 848 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 849 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 850 * completion event count as it's tied to a specific request. If used, the timeout specified in the 851 * command will cancel the linked command, unless the linked command completes before the 852 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 853 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 854 * request. 855 * 856 * Note: Available from Linux 5.5 857 * 858 * Params: 859 * entry = `SubmissionEntry` to prepare 860 * time = time specification 861 * flags = define if it's a relative or absolute time 862 */ 863 void prepLinkTimeout(ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted 864 { 865 entry.opcode = Operation.LINK_TIMEOUT; 866 entry.fd = -1; 867 entry.addr = cast(ulong)(cast(void*)&time); 868 entry.len = 1; 869 entry.timeout_flags = flags; 870 } 871 872 /** 873 * Note: Available from Linux 5.5 874 */ 875 void prepConnect(ADDR)(ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted 876 { 877 entry.opcode = Operation.CONNECT; 878 entry.fd = fd; 879 entry.addr = cast(ulong)(cast(void*)&addr); 880 entry.off = ADDR.sizeof; 881 } 882 883 private: 884 885 // uring cleanup 886 void dispose(ref Uring uring) 887 { 888 if (uring.payload is null) return; 889 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 890 if (--uring.payload.refs == 0) 891 { 892 import std.traits : hasElaborateDestructor; 893 // debug printf("uring(%d): free\n", uring.payload.fd); 894 static if (hasElaborateDestructor!UringDesc) 895 destroy(*uring.payload); // call possible destructors 896 free(cast(void*)uring.payload); 897 } 898 uring.payload = null; 899 } 900 901 // system fields descriptor 902 struct UringDesc 903 { 904 nothrow @nogc: 905 906 int fd; 907 size_t refs; 908 SetupParameters params; 909 SubmissionQueue sq; 910 CompletionQueue cq; 911 912 iovec[] regBuffers; 913 914 ~this() 915 { 916 if (regBuffers) free(cast(void*)®Buffers[0]); 917 if (sq.ring) munmap(sq.ring, sq.ringSize); 918 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 919 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 920 close(fd); 921 } 922 923 private auto mapRings() 924 { 925 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 926 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 927 928 if (params.features & SetupFeatures.SINGLE_MMAP) 929 { 930 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 931 cq.ringSize = sq.ringSize; 932 } 933 934 sq.ring = mmap(null, sq.ringSize, 935 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 936 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 937 ); 938 939 if (sq.ring == MAP_FAILED) 940 { 941 sq.ring = null; 942 return -errno; 943 } 944 945 if (params.features & SetupFeatures.SINGLE_MMAP) 946 cq.ring = sq.ring; 947 else 948 { 949 cq.ring = mmap(null, cq.ringSize, 950 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 951 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 952 ); 953 954 if (cq.ring == MAP_FAILED) 955 { 956 cq.ring = null; 957 return -errno; // cleanup is done in struct destructors 958 } 959 } 960 961 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 962 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 963 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 964 sq.localTail = *sq.ktail; 965 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 966 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 967 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 968 969 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 970 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 971 // That way, head and tail are actually indexes to our sqes array. 972 foreach (i; 0..entries) 973 { 974 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 975 } 976 977 auto psqes = mmap( 978 null, entries * SubmissionEntry.sizeof, 979 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 980 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 981 ); 982 983 if (psqes == MAP_FAILED) return -errno; 984 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 985 986 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 987 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 988 cq.localHead = *cq.khead; 989 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 990 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 991 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 992 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 993 return 0; 994 } 995 } 996 997 /// Wraper for `SubmissionEntry` queue 998 struct SubmissionQueue 999 { 1000 nothrow @nogc: 1001 1002 // mmaped fields 1003 uint* khead; // controlled by kernel 1004 uint* ktail; // controlled by us 1005 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1006 uint* kdropped; // counter of invalid submissions (out of bound index) 1007 uint ringMask; // constant mask used to determine array index from head/tail 1008 1009 // mmap details (for cleanup) 1010 void* ring; // pointer to the mmaped region 1011 size_t ringSize; // size of mmaped memory block 1012 1013 // mmapped list of entries (fixed length) 1014 SubmissionEntry[] sqes; 1015 1016 uint localTail; // used for batch submission 1017 1018 uint head() const { return atomicLoad!(MemoryOrder.acq)(*khead); } 1019 uint tail() const { return localTail; } 1020 1021 void flushTail() 1022 { 1023 pragma(inline); 1024 // debug printf("SQ updating tail: %d\n", localTail); 1025 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1026 } 1027 1028 SubmissionQueueFlags flags() const 1029 { 1030 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1031 } 1032 1033 bool full() const { return sqes.length == length; } 1034 1035 size_t length() const { return tail - head; } 1036 1037 size_t capacity() const { return sqes.length - length; } 1038 1039 void put()(auto ref SubmissionEntry entry) 1040 { 1041 assert(!full, "SumbissionQueue is full"); 1042 sqes[tail & ringMask] = entry; 1043 localTail++; 1044 } 1045 1046 void put(OP)(auto ref OP op) 1047 if (!is(OP == SubmissionEntry)) 1048 { 1049 assert(!full, "SumbissionQueue is full"); 1050 sqes[tail & ringMask].clear(); 1051 sqes[tail & ringMask].fill(op); 1052 localTail++; 1053 } 1054 1055 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1056 { 1057 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1058 1059 static assert( 1060 Parameters!FN.length >= 1 1061 && is(Parameters!FN[0] == SubmissionEntry) 1062 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1063 "Alias function must accept at least `ref SubmissionEntry`"); 1064 1065 static assert( 1066 is(typeof(FN(sqes[tail & ringMask], args))), 1067 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1068 1069 assert(!full, "SumbissionQueue is full"); 1070 sqes[tail & ringMask].clear(); 1071 FN(sqes[tail & ringMask], args); 1072 localTail++; 1073 } 1074 1075 uint dropped() const { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1076 } 1077 1078 struct CompletionQueue 1079 { 1080 nothrow @nogc: 1081 1082 // mmaped fields 1083 uint* khead; // controlled by us (increment after entry at head was read) 1084 uint* ktail; // updated by kernel 1085 uint* koverflow; 1086 CompletionEntry[] cqes; // array of entries (fixed length) 1087 1088 uint ringMask; // constant mask used to determine array index from head/tail 1089 1090 // mmap details (for cleanup) 1091 void* ring; 1092 size_t ringSize; 1093 1094 uint localHead; // used for bulk reading 1095 1096 uint head() const { return localHead; } 1097 uint tail() const { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1098 1099 void flushHead() 1100 { 1101 pragma(inline); 1102 // debug printf("CQ updating head: %d\n", localHead); 1103 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1104 } 1105 1106 bool empty() const { return head == tail; } 1107 1108 ref CompletionEntry front() return 1109 { 1110 assert(!empty, "CompletionQueue is empty"); 1111 return cqes[localHead & ringMask]; 1112 } 1113 1114 void popFront() 1115 { 1116 pragma(inline); 1117 assert(!empty, "CompletionQueue is empty"); 1118 localHead++; 1119 flushHead(); 1120 } 1121 1122 size_t length() const { return tail - localHead; } 1123 1124 uint overflow() const { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1125 } 1126 1127 // just a helper to use atomicStore more easily with older compilers 1128 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1129 { 1130 pragma(inline, true); 1131 import core.atomic : store = atomicStore; 1132 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1133 else store!ms(*(cast(shared T*)&val), newVal); 1134 } 1135 1136 // just a helper to use atomicLoad more easily with older compilers 1137 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1138 { 1139 pragma(inline, true); 1140 import core.atomic : load = atomicLoad; 1141 static if (__VERSION__ >= 2089) return load!ms(val); 1142 else return load!ms(*(cast(const shared T*)&val)); 1143 } 1144 1145 version (assert) 1146 { 1147 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1148 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1149 static assert(isOutputRange!(Uring, SubmissionEntry)); 1150 }