1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version (linux) {} 8 else static assert(0, "io_uring is available on linux only"); 9 10 public import during.io_uring; 11 12 import core.atomic : MemoryOrder; 13 debug import core.stdc.stdio; 14 import core.stdc.stdlib; 15 import core.sys.linux.errno; 16 import core.sys.linux.sys.mman; 17 import core.sys.linux.unistd; 18 import core.sys.posix.signal; 19 import core.sys.posix.sys.socket; 20 import core.sys.posix.sys.uio; 21 import std.algorithm.comparison : among; 22 23 nothrow @nogc: 24 25 /** 26 * Setup new instance of io_uring into provided `Uring` structure. 27 * 28 * Params: 29 * uring = `Uring` structure to be initialized (must not be already initialized) 30 * entries = Number of entries to initialize uring with 31 * flags = `SetupFlags` to use to initialize uring. 32 * 33 * Returns: On succes it returns 0, `-errno` otherwise. 34 */ 35 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) 36 { 37 assert(uring.payload is null, "Uring is already initialized"); 38 uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof); 39 if (uring.payload is null) return -errno; 40 41 uring.payload.params.flags = flags; 42 uring.payload.refs = 1; 43 auto r = io_uring_setup(entries, uring.payload.params); 44 if (r < 0) return -errno; 45 46 uring.payload.fd = r; 47 48 if (uring.payload.mapRings() < 0) 49 { 50 dispose(uring); 51 return -errno; 52 } 53 54 // debug printf("uring(%d): setup\n", uring.payload.fd); 55 56 return 0; 57 } 58 59 /** 60 * Main entry point to work with io_uring. 61 * 62 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 63 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 64 * 65 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 66 * 67 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 68 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 69 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 70 * reused for multiple operations. 71 */ 72 struct Uring 73 { 74 nothrow @nogc: 75 76 private UringDesc* payload; 77 private void checkInitialized() const 78 { 79 assert(payload !is null, "Uring hasn't been initialized yet"); 80 } 81 82 /// Copy constructor 83 this(ref return scope Uring rhs) 84 { 85 assert(rhs.payload !is null, "rhs payload is null"); 86 // debug printf("uring(%d): copy\n", rhs.payload.fd); 87 this.payload = rhs.payload; 88 this.payload.refs++; 89 } 90 91 /// Destructor 92 ~this() 93 { 94 dispose(this); 95 } 96 97 /// Native io_uring file descriptor 98 auto fd() const 99 { 100 checkInitialized(); 101 return payload.fd; 102 } 103 104 /// io_uring parameters 105 SetupParameters params() const return 106 { 107 checkInitialized(); 108 return payload.params; 109 } 110 111 /// Check if there is some `CompletionEntry` to process. 112 bool empty() const 113 { 114 checkInitialized(); 115 return payload.cq.empty; 116 } 117 118 /// Check if there is space for another `SubmissionEntry` to submit. 119 bool full() const 120 { 121 checkInitialized(); 122 return payload.sq.full; 123 } 124 125 /// Available space in submission queue before it becomes full 126 size_t capacity() const 127 { 128 checkInitialized(); 129 return payload.sq.capacity; 130 } 131 132 /// Number of entries in completion queue 133 size_t length() const 134 { 135 checkInitialized(); 136 return payload.cq.length; 137 } 138 139 /// Get first `CompletionEntry` from cq ring 140 ref CompletionEntry front() return 141 { 142 checkInitialized(); 143 return payload.cq.front; 144 } 145 146 /// Move to next `CompletionEntry` 147 void popFront() 148 { 149 checkInitialized(); 150 return payload.cq.popFront; 151 } 152 153 /** 154 * Adds new entry to the `SubmissionQueue`. 155 * 156 * Note that this just adds entry to the queue and doesn't advance the tail 157 * marker kernel sees. For that `finishSq()` is needed to be called next. 158 * 159 * Also note that to actually enter new entries to kernel, 160 * it's needed to call `submit()`. 161 * 162 * Params: 163 * FN = Function to fill next entry in queue by `ref` (should be faster). 164 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 165 * Note that in this case queue entry is cleaned first before function is called. 166 * entry = Custom built `SubmissionEntry` to be posted as is. 167 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 168 * args = Optional arguments passed to the function 169 * 170 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 171 */ 172 ref Uring put()(auto ref SubmissionEntry entry) return 173 { 174 checkInitialized(); 175 payload.sq.put(entry); 176 return this; 177 } 178 179 /// ditto 180 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 181 { 182 import std.functional : forward; 183 checkInitialized(); 184 payload.sq.putWith!FN(forward!args); 185 return this; 186 } 187 188 /** 189 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 190 * to next `SubmissionEntry` in queue. 191 * 192 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 193 * 194 * Params: 195 * op = Custom operation definition. 196 * Returns: 197 */ 198 ref Uring put(OP)(auto ref OP op) return 199 if (!is(OP == SubmissionEntry)) 200 { 201 checkInitialized(); 202 payload.sq.put(op); 203 return this; 204 } 205 206 /** 207 * If completion queue is full, the new event maybe dropped. 208 * This value records number of dropped events. 209 */ 210 uint overflow() const 211 { 212 checkInitialized(); 213 return payload.cq.overflow; 214 } 215 216 /// Counter of invalid submissions (out-of-bound index in submission array) 217 uint dropped() const 218 { 219 checkInitialized(); 220 return payload.sq.dropped; 221 } 222 223 /** 224 * Submits qued `SubmissionEntry` to be processed by kernel. 225 * 226 * Params: 227 * want = number of `CompletionEntries` to wait for. 228 * If 0, this just submits queued entries and returns. 229 * If > 0, it blocks until at least wanted number of entries were completed. 230 * sig = See io_uring_enter(2) man page 231 * 232 * Returns: Number of submitted entries on success, `-errno` on error 233 */ 234 auto submit(uint want = 0, const sigset_t* sig = null) @trusted 235 { 236 checkInitialized(); 237 238 auto len = cast(uint)payload.sq.length; 239 if (len > 0) // anything to submit? 240 { 241 EnterFlags flags; 242 if (want > 0) flags |= EnterFlags.GETEVENTS; 243 244 payload.sq.flushTail(); // advance queue index 245 246 if (payload.params.flags & SetupFlags.SQPOLL) 247 { 248 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP) 249 flags |= EnterFlags.SQ_WAKEUP; 250 else if (want == 0) return len; // fast poll 251 } 252 auto r = io_uring_enter(payload.fd, len, want, flags, sig); 253 if (r < 0) return -errno; 254 return r; 255 } 256 else if (want > 0) return wait(want); // just simple wait 257 return 0; 258 } 259 260 /** 261 * Simmilar to `submit` but with this method we just wait for required number 262 * of `CompletionEntries`. 263 * 264 * Returns: `0` on success, `-errno` on error 265 */ 266 auto wait(uint want = 1, const sigset_t* sig = null) @trusted 267 { 268 pragma(inline); 269 checkInitialized(); 270 assert(want > 0, "Invalid want value"); 271 272 if (payload.cq.length >= want) return 0; // we don't need to syscall 273 274 auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig); 275 if (r < 0) return -errno; 276 return 0; 277 } 278 279 /** 280 * Register single buffer to be mapped into the kernel for faster buffered operations. 281 * 282 * To use the buffers, the application must specify the fixed variants for of operations, 283 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 284 * in entry extra data. 285 * 286 * An application can increase or decrease the size or number of registered buffers by first 287 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 288 * the new buffers. 289 * 290 * Params: 291 * buffer = Buffers to be registered 292 * 293 * Returns: On success, returns 0. On error, `-errno` is returned. 294 */ 295 auto registerBuffers(T)(T buffers) 296 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 297 { 298 checkInitialized(); 299 assert(buffers.length, "Empty buffer"); 300 301 if (payload.regBuffers !is null) 302 return -EBUSY; // buffers were already registered 303 304 static if (is(T == ubyte[])) 305 { 306 auto p = malloc(iovec.sizeof); 307 if (p is null) return -errno; 308 payload.regBuffers = (cast(iovec*)p)[0..1]; 309 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 310 payload.regBuffers[0].iov_len = buffers.length; 311 } 312 else static if (is(T == ubyte[][])) 313 { 314 auto p = malloc(buffers.length * iovec.sizeof); 315 if (p is null) return -errno; 316 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 317 318 foreach (i, b; buffers) 319 { 320 assert(b.length, "Empty buffer"); 321 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 322 payload.regBuffers[i].iov_len = b.length; 323 } 324 } 325 326 auto r = io_uring_register( 327 payload.fd, 328 RegisterOpCode.REGISTER_BUFFERS, 329 cast(const(void)*)&payload.regBuffers[0], 1 330 ); 331 332 if (r < 0) return -errno; 333 return 0; 334 } 335 336 /** 337 * Releases all previously registered buffers associated with the `io_uring` instance. 338 * 339 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 340 * 341 * Returns: On success, returns 0. On error, `-errno` is returned. 342 */ 343 auto unregisterBuffers() @trusted 344 { 345 checkInitialized(); 346 347 if (payload.regBuffers is null) 348 return -ENXIO; // no buffers were registered 349 350 free(cast(void*)&payload.regBuffers[0]); 351 payload.regBuffers = null; 352 353 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 354 if (r < 0) return -errno; 355 return 0; 356 } 357 358 /** 359 * Register files for I/O. 360 * 361 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 362 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 363 * file descriptor array. 364 * 365 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 366 * need only unregister if it wishes to register a new set of fds. 367 * 368 * Use `-1` as a file descriptor to mark it as reserved in the array.* 369 * Params: fds = array of file descriptors to be registered 370 * 371 * Returns: On success, returns 0. On error, `-errno` is returned. 372 */ 373 auto registerFiles(const(int)[] fds) 374 { 375 checkInitialized(); 376 assert(fds.length, "No file descriptors provided"); 377 assert(fds.length < uint.max, "Too many file descriptors"); 378 379 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 380 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 381 if (r < 0) return -errno; 382 return 0; 383 } 384 385 /* 386 * Register an update for an existing file set. The updates will start at 387 * `off` in the original array. 388 * 389 * Use `-1` as a file descriptor to mark it as reserved in the array. 390 * 391 * Params: 392 * off = offset to the original registered files to be updated 393 * files = array of file descriptors to update with 394 * 395 * Returns: number of files updated on success, -errno on failure. 396 */ 397 auto registerFilesUpdate(uint off, const(int)[] fds) @trusted 398 { 399 struct Update 400 { 401 uint off; 402 const(int)* fds; 403 } 404 405 checkInitialized(); 406 assert(fds.length, "No file descriptors provided to update"); 407 assert(fds.length < uint.max, "Too many file descriptors"); 408 409 Update u = Update(off, &fds[0]); 410 auto r = io_uring_register( 411 payload.fd, 412 RegisterOpCode.REGISTER_FILES_UPDATE, 413 &u, cast(uint)fds.length); 414 if (r < 0) return -errno; 415 return 0; 416 } 417 418 /** 419 * All previously registered files associated with the `io_uring` instance will be unregistered. 420 * 421 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 422 * need only unregister if it wishes to register a new set of fds. 423 * 424 * Returns: On success, returns 0. On error, `-errno` is returned. 425 */ 426 auto unregisterFiles() @trusted 427 { 428 checkInitialized(); 429 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 430 if (r < 0) return -errno; 431 return 0; 432 } 433 434 /** 435 * TODO 436 * 437 * Params: eventFD = TODO 438 * 439 * Returns: On success, returns 0. On error, `-errno` is returned. 440 */ 441 auto registerEventFD(int eventFD) @trusted 442 { 443 checkInitialized(); 444 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 445 if (r < 0) return -errno; 446 return 0; 447 } 448 449 /** 450 * TODO 451 * 452 * Returns: On success, returns 0. On error, `-errno` is returned. 453 */ 454 auto unregisterEventFD() @trusted 455 { 456 checkInitialized(); 457 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 458 if (r < 0) return -errno; 459 return 0; 460 } 461 } 462 463 /** 464 * Uses custom operation definition to fill fields of `SubmissionEntry`. 465 * Can be used in cases, when builtin prep* functions aren't enough. 466 * 467 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 468 * 469 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 470 * So it might be needed to call `clear` first on the entry (depends on usage). 471 * 472 * Params: 473 * entry = entry to set parameters to 474 * op = operation to fill entry with (can be custom type) 475 */ 476 void fill(E)(ref SubmissionEntry entry, auto ref E op) 477 { 478 pragma(inline); 479 import std.traits : hasMember, FieldNameTuple; 480 481 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 482 foreach (m; FieldNameTuple!E) 483 { 484 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 485 __traits(getMember, entry, m) = __traits(getMember, op, m); 486 } 487 } 488 489 /** 490 * Template function to help set `SubmissionEntry` `user_data` field. 491 * 492 * Params: 493 * entry = `SubmissionEntry` to prepare 494 * data = data to set to the `SubmissionEntry` 495 * 496 * Note: data are passed by ref and must live during whole operation. 497 */ 498 void setUserData(D)(ref SubmissionEntry entry, ref D data) 499 { 500 entry.user_data = cast(ulong)(cast(void*)&data); 501 } 502 503 /** 504 * Prepares `nop` operation. 505 * 506 * Params: 507 * entry = `SubmissionEntry` to prepare 508 */ 509 void prepNop(ref SubmissionEntry entry) @safe 510 { 511 entry.opcode = Operation.NOP; 512 entry.fd = -1; 513 } 514 515 // TODO: check offset behavior, preadv2/pwritev2 should accept -1 to work on the current file offset, 516 // but it doesn't seem to work here. 517 518 /** 519 * Prepares `readv` operation. 520 * 521 * Params: 522 * entry = `SubmissionEntry` to prepare 523 * fd = file descriptor of file we are operating on 524 * offset = offset 525 * buffer = iovec buffers to be used by the operation 526 */ 527 void prepReadv(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset) 528 if (is(V == iovec[]) || is(V == iovec)) 529 { 530 entry.opcode = Operation.READV; 531 entry.fd = fd; 532 entry.off = offset; 533 static if (is(V == iovec[])) 534 { 535 assert(buffer.length, "Empty buffer"); 536 assert(buffer.length < uint.max, "Too many iovec buffers"); 537 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 538 entry.len = cast(uint)buffer.length; 539 } 540 else 541 { 542 entry.addr = cast(ulong)(cast(void*)&buffer); 543 entry.len = 1; 544 } 545 } 546 547 /** 548 * Prepares `writev` operation. 549 * 550 * Params: 551 * entry = `SubmissionEntry` to prepare 552 * fd = file descriptor of file we are operating on 553 * offset = offset 554 * buffer = iovec buffers to be used by the operation 555 */ 556 void prepWritev(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset) 557 if (is(V == iovec[]) || is(V == iovec)) 558 { 559 entry.opcode = Operation.WRITEV; 560 entry.fd = fd; 561 entry.off = offset; 562 static if (is(V == iovec[])) 563 { 564 assert(buffer.length, "Empty buffer"); 565 assert(buffer.length < uint.max, "Too many iovec buffers"); 566 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 567 entry.len = cast(uint)buffer.length; 568 } 569 else 570 { 571 entry.addr = cast(ulong)(cast(void*)&buffer); 572 entry.len = 1; 573 } 574 } 575 576 /** 577 * Prepares `read_fixed` operation. 578 * 579 * Params: 580 * entry = `SubmissionEntry` to prepare 581 * fd = file descriptor of file we are operating on 582 * offset = offset 583 * buffer = slice to preregistered buffer 584 * bufferIndex = index to the preregistered buffers array buffer belongs to 585 */ 586 void prepReadFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex) 587 { 588 assert(buffer.length, "Empty buffer"); 589 assert(buffer.length < uint.max, "Buffer too large"); 590 entry.opcode = Operation.READ_FIXED; 591 entry.fd = fd; 592 entry.off = offset; 593 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 594 entry.len = cast(uint)buffer.length; 595 entry.buf_index = bufferIndex; 596 } 597 598 /** 599 * Prepares `write_fixed` operation. 600 * 601 * Params: 602 * entry = `SubmissionEntry` to prepare 603 * fd = file descriptor of file we are operating on 604 * offset = offset 605 * buffer = slice to preregistered buffer 606 * bufferIndex = index to the preregistered buffers array buffer belongs to 607 */ 608 void prepWriteFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex) 609 { 610 assert(buffer.length, "Empty buffer"); 611 assert(buffer.length < uint.max, "Buffer too large"); 612 entry.opcode = Operation.WRITE_FIXED; 613 entry.fd = fd; 614 entry.off = offset; 615 entry.addr = cast(ulong)(cast(void*)&buffer[0]); 616 entry.len = cast(uint)buffer.length; 617 entry.buf_index = bufferIndex; 618 } 619 620 /** 621 * Prepares `sendmsg(2)` operation. 622 * 623 * Params: 624 * entry = `SubmissionEntry` to prepare 625 * fd = file descriptor of file we are operating on 626 * msg = message to operate with 627 * flags = `sendmsg` operation flags 628 * 629 * Note: Available from Linux 5.3 630 * 631 * See_Also: `sendmsg(2)` man page for details. 632 */ 633 void prepSendMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) 634 { 635 entry.opcode = Operation.SENDMSG; 636 entry.fd = fd; 637 entry.addr = cast(ulong)(cast(void*)&msg); 638 entry.msg_flags = flags; 639 } 640 641 /** 642 * Prepares `recvmsg(2)` operation. 643 * 644 * Params: 645 * entry = `SubmissionEntry` to prepare 646 * fd = file descriptor of file we are operating on 647 * msg = message to operate with 648 * flags = `recvmsg` operation flags 649 * 650 * Note: Available from Linux 5.3 651 * 652 * See_Also: `recvmsg(2)` man page for details. 653 */ 654 void prepRecvMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) 655 { 656 entry.opcode = Operation.RECVMSG; 657 entry.fd = fd; 658 entry.addr = cast(ulong)(cast(void*)&msg); 659 entry.msg_flags = flags; 660 } 661 662 /** 663 * Prepares `fsync` operation. 664 * 665 * Params: 666 * entry = `SubmissionEntry` to prepare 667 * fd = file descriptor of a file to call `fsync` on 668 * flags = `fsync` operation flags 669 */ 670 void prepFsync(ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 671 { 672 entry.opcode = Operation.FSYNC; 673 entry.fd = fd; 674 entry.fsync_flags = flags; 675 } 676 677 /** 678 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 679 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 680 * That is, once the poll operation is completed, it will have to be resubmitted. 681 * 682 * Params: 683 * entry = `SubmissionEntry` to prepare 684 * fd = file descriptor to poll 685 * events = events to poll on the FD 686 */ 687 void prepPollAdd(ref SubmissionEntry entry, int fd, PollEvents events) @safe 688 { 689 entry.opcode = Operation.POLL_ADD; 690 entry.fd = fd; 691 entry.poll_events = events; 692 } 693 694 /** 695 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 696 * `0`. If not found, res will contain `-ENOENT`. 697 * 698 * Params: 699 * entry = `SubmissionEntry` to prepare 700 * userData = data with the previously issued poll operation 701 */ 702 void prepPollRemove(D)(ref SubmissionEntry entry, ref D userData) 703 { 704 entry.opcode = Operation.POLL_REMOVE; 705 entry.fd = -1; 706 entry.addr = cast(ulong)(cast(void*)&userData); 707 } 708 709 /** 710 * Prepares `sync_file_range(2)` operation. 711 * 712 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 713 * by the file descriptor fd with disk. 714 * 715 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 716 * 717 * Params: 718 * entry = `SubmissionEntry` to prepare 719 * fd = is the file descriptor to sync 720 * offset = the starting byte of the file range to be synchronized 721 * len = the length of the range to be synchronized, in bytes 722 * flags = the flags for the command. 723 * 724 * See_Also: `sync_file_range(2)` for the general description of the related system call. 725 * 726 * Note: available from Linux 5.2 727 */ 728 void prepSyncFileRange(ref SubmissionEntry entry, int fd, ulong offset, uint len, SyncFileRangeFlags flags) @safe 729 { 730 entry.opcode = Operation.SYNC_FILE_RANGE; 731 entry.fd = fd; 732 entry.off = offset; 733 entry.len = len; 734 entry.sync_range_flags = flags; 735 } 736 737 /** 738 * This command will register a timeout operation. 739 * 740 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 741 * timeout condition is met when either the specified timeout expires, or the specified number of 742 * events have completed. Either condition will trigger the event. The request will complete with 743 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 744 * completed through requests completing on their own. If the timeout was cancelled before it 745 * expired, the request will complete with `-ECANCELED`. 746 * 747 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 748 * 749 * Params: 750 * entry = `SubmissionEntry` to prepare 751 * time = reference to `time64` data structure 752 * count = completion event count 753 * flags = define if it's a relative or absolute time 754 * 755 * Note: Available from Linux 5.4 756 */ 757 void prepTimeout(ref SubmissionEntry entry, ref KernelTimespec time, 758 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) 759 { 760 entry.opcode = Operation.TIMEOUT; 761 entry.fd = -1; 762 entry.addr = cast(ulong)(cast(void*)&time); 763 entry.len = 1; 764 entry.off = count; 765 entry.timeout_flags = flags; 766 } 767 768 /** 769 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 770 * 771 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 772 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 773 * timeout request was found but expiration was already in progress, this request will terminate 774 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 775 * terminate with a result value of `-ENOENT`. 776 * 777 * Params: 778 * entry = `SubmissionEntry` to prepare 779 * userData = user data provided with the previously issued timeout operation 780 * 781 * Note: Available from Linux 5.5 782 */ 783 void prepTimeoutRemove(D)(ref SubmissionEntry entry, ref D userData) 784 { 785 entry.opcode = Operation.TIMEOUT_REMOVE; 786 entry.fd = -1; 787 entry.addr = cast(ulong)(cast(void*)&userData); 788 } 789 790 /** 791 * Prepares `accept4(2)` operation. 792 * 793 * See_Also: `accept4(2)`` for the general description of the related system call. 794 * 795 * Params: 796 * entry = `SubmissionEntry` to prepare 797 * fd = socket file descriptor 798 * addr = reference to one of sockaddr structires to be filled with accepted client address 799 * addrlen = reference to addrlen field that would be filled with accepted client address length 800 * 801 * Note: Available from Linux 5.5 802 */ 803 void prepAccept(ADDR)(ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 804 AcceptFlags flags = AcceptFlags.NONE) 805 { 806 entry.opcode = Operation.ACCEPT; 807 entry.fd = fd; 808 entry.addr = cast(ulong)(cast(void*)&addr); 809 entry.addr2 = cast(ulong)(cast(void*)&addrlen); 810 entry.accept_flags = flags; 811 } 812 813 /** 814 * Prepares operation that cancels existing async work. 815 * 816 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 817 * distinction to make here with the different kinds of commands. A read/write on a regular file 818 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 819 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 820 * these operations if they haven’t yet been started. If they have been started, cancellations on 821 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 822 * at any time. The completion event for this request will have a result of 0 if done successfully, 823 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 824 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 825 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 826 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 827 * possible. 828 * 829 * Params: 830 * entry = `SubmissionEntry` to prepare 831 * userData = `user_data` field of the request that should be cancelled 832 * 833 * Note: Available from Linux 5.5 834 */ 835 void prepCancel(D)(ref SubmissionEntry entry, ref D userData/*, uint flags = 0*/) 836 { 837 entry.opcode = Operation.ASYNC_CANCEL; 838 entry.fd = -1; 839 entry.addr = cast(ulong)(cast(void*)&userData); 840 // entry.cancel_flags = flags; // TODO: there are none yet 841 } 842 843 /** 844 * Prepares linked timeout operation. 845 * 846 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 847 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 848 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 849 * completion event count as it's tied to a specific request. If used, the timeout specified in the 850 * command will cancel the linked command, unless the linked command completes before the 851 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 852 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 853 * request. 854 * 855 * Note: Available from Linux 5.5 856 * 857 * Params: 858 * entry = `SubmissionEntry` to prepare 859 * time = time specification 860 * flags = define if it's a relative or absolute time 861 */ 862 void prepLinkTimeout(ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) 863 { 864 entry.opcode = Operation.LINK_TIMEOUT; 865 entry.fd = -1; 866 entry.addr = cast(ulong)(cast(void*)&time); 867 entry.len = 1; 868 entry.timeout_flags = flags; 869 } 870 871 /** 872 * Note: Available from Linux 5.5 873 */ 874 void prepConnect(ADDR)(ref SubmissionEntry entry, int fd, ref const(ADDR) addr) 875 { 876 entry.opcode = Operation.CONNECT; 877 entry.fd = fd; 878 entry.addr = cast(ulong)(cast(void*)&addr); 879 entry.off = ADDR.sizeof; 880 } 881 882 private: 883 884 // uring cleanup 885 void dispose(ref Uring uring) 886 { 887 if (uring.payload is null) return; 888 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 889 if (--uring.payload.refs == 0) 890 { 891 import std.traits : hasElaborateDestructor; 892 // debug printf("uring(%d): free\n", uring.payload.fd); 893 static if (hasElaborateDestructor!UringDesc) 894 destroy(*uring.payload); // call possible destructors 895 free(cast(void*)uring.payload); 896 } 897 uring.payload = null; 898 } 899 900 // system fields descriptor 901 struct UringDesc 902 { 903 nothrow @nogc: 904 905 int fd; 906 size_t refs; 907 SetupParameters params; 908 SubmissionQueue sq; 909 CompletionQueue cq; 910 911 iovec[] regBuffers; 912 913 ~this() 914 { 915 if (regBuffers) free(cast(void*)®Buffers[0]); 916 if (sq.ring) munmap(sq.ring, sq.ringSize); 917 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 918 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 919 close(fd); 920 } 921 922 private auto mapRings() 923 { 924 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 925 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 926 927 if (params.features & SetupFeatures.SINGLE_MMAP) 928 { 929 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 930 cq.ringSize = sq.ringSize; 931 } 932 933 sq.ring = mmap(null, sq.ringSize, 934 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 935 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 936 ); 937 938 if (sq.ring == MAP_FAILED) 939 { 940 sq.ring = null; 941 return -errno; 942 } 943 944 if (params.features & SetupFeatures.SINGLE_MMAP) 945 cq.ring = sq.ring; 946 else 947 { 948 cq.ring = mmap(null, cq.ringSize, 949 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 950 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 951 ); 952 953 if (cq.ring == MAP_FAILED) 954 { 955 cq.ring = null; 956 return -errno; // cleanup is done in struct destructors 957 } 958 } 959 960 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 961 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 962 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 963 sq.localTail = *sq.ktail; 964 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 965 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 966 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 967 968 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 969 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 970 // That way, head and tail are actually indexes to our sqes array. 971 foreach (i; 0..entries) 972 { 973 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 974 } 975 976 auto psqes = mmap( 977 null, entries * SubmissionEntry.sizeof, 978 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 979 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 980 ); 981 982 if (psqes == MAP_FAILED) return -errno; 983 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 984 985 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 986 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 987 cq.localHead = *cq.khead; 988 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 989 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 990 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 991 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 992 return 0; 993 } 994 } 995 996 /// Wraper for `SubmissionEntry` queue 997 struct SubmissionQueue 998 { 999 nothrow @nogc: 1000 1001 // mmaped fields 1002 uint* khead; // controlled by kernel 1003 uint* ktail; // controlled by us 1004 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1005 uint* kdropped; // counter of invalid submissions (out of bound index) 1006 uint ringMask; // constant mask used to determine array index from head/tail 1007 1008 // mmap details (for cleanup) 1009 void* ring; // pointer to the mmaped region 1010 size_t ringSize; // size of mmaped memory block 1011 1012 // mmapped list of entries (fixed length) 1013 SubmissionEntry[] sqes; 1014 1015 uint localTail; // used for batch submission 1016 1017 uint head() const { return atomicLoad!(MemoryOrder.acq)(*khead); } 1018 uint tail() const { return localTail; } 1019 1020 void flushTail() 1021 { 1022 pragma(inline); 1023 // debug printf("SQ updating tail: %d\n", localTail); 1024 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1025 } 1026 1027 SubmissionQueueFlags flags() const 1028 { 1029 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1030 } 1031 1032 bool full() const { return sqes.length == length; } 1033 1034 size_t length() const { return tail - head; } 1035 1036 size_t capacity() const { return sqes.length - length; } 1037 1038 void put()(auto ref SubmissionEntry entry) 1039 { 1040 assert(!full, "SumbissionQueue is full"); 1041 sqes[tail & ringMask] = entry; 1042 localTail++; 1043 } 1044 1045 void put(OP)(auto ref OP op) 1046 if (!is(OP == SubmissionEntry)) 1047 { 1048 assert(!full, "SumbissionQueue is full"); 1049 sqes[tail & ringMask].clear(); 1050 sqes[tail & ringMask].fill(op); 1051 localTail++; 1052 } 1053 1054 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1055 { 1056 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1057 1058 static assert( 1059 Parameters!FN.length >= 1 1060 && is(Parameters!FN[0] == SubmissionEntry) 1061 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1062 "Alias function must accept at least `ref SubmissionEntry`"); 1063 1064 static assert( 1065 is(typeof(FN(sqes[tail & ringMask], args))), 1066 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1067 1068 assert(!full, "SumbissionQueue is full"); 1069 sqes[tail & ringMask].clear(); 1070 FN(sqes[tail & ringMask], args); 1071 localTail++; 1072 } 1073 1074 uint dropped() const { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1075 } 1076 1077 struct CompletionQueue 1078 { 1079 nothrow @nogc: 1080 1081 // mmaped fields 1082 uint* khead; // controlled by us (increment after entry at head was read) 1083 uint* ktail; // updated by kernel 1084 uint* koverflow; 1085 CompletionEntry[] cqes; // array of entries (fixed length) 1086 1087 uint ringMask; // constant mask used to determine array index from head/tail 1088 1089 // mmap details (for cleanup) 1090 void* ring; 1091 size_t ringSize; 1092 1093 uint localHead; // used for bulk reading 1094 1095 uint head() const { return localHead; } 1096 uint tail() const { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1097 1098 void flushHead() 1099 { 1100 pragma(inline); 1101 // debug printf("CQ updating head: %d\n", localHead); 1102 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1103 } 1104 1105 bool empty() const { return head == tail; } 1106 1107 ref CompletionEntry front() return 1108 { 1109 assert(!empty, "CompletionQueue is empty"); 1110 return cqes[localHead & ringMask]; 1111 } 1112 1113 void popFront() 1114 { 1115 pragma(inline); 1116 assert(!empty, "CompletionQueue is empty"); 1117 localHead++; 1118 flushHead(); 1119 } 1120 1121 size_t length() const { return tail - localHead; } 1122 1123 uint overflow() const { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1124 } 1125 1126 // just a helper to use atomicStore more easily with older compilers 1127 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1128 { 1129 pragma(inline, true); 1130 import core.atomic : store = atomicStore; 1131 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1132 else store!ms(*(cast(shared T*)&val), newVal); 1133 } 1134 1135 // just a helper to use atomicLoad more easily with older compilers 1136 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1137 { 1138 pragma(inline, true); 1139 import core.atomic : load = atomicLoad; 1140 static if (__VERSION__ >= 2089) return load!ms(val); 1141 else return load!ms(*(cast(const shared T*)&val)); 1142 } 1143 1144 version (assert) 1145 { 1146 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1147 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1148 static assert(isOutputRange!(Uring, SubmissionEntry)); 1149 }