1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version (linux) {} 8 else static assert(0, "io_uring is available on linux only"); 9 10 public import during.io_uring; 11 import during.openat2; 12 13 import core.atomic : MemoryOrder; 14 debug import core.stdc.stdio; 15 import core.stdc.stdlib; 16 import core.sys.linux.epoll; 17 import core.sys.linux.errno; 18 import core.sys.linux.sys.mman; 19 import core.sys.linux.unistd; 20 import core.sys.posix.signal; 21 import core.sys.posix.sys.socket; 22 import core.sys.posix.sys.uio; 23 import std.algorithm.comparison : among; 24 25 nothrow @nogc: 26 27 /** 28 * Setup new instance of io_uring into provided `Uring` structure. 29 * 30 * Params: 31 * uring = `Uring` structure to be initialized (must not be already initialized) 32 * entries = Number of entries to initialize uring with 33 * flags = `SetupFlags` to use to initialize uring. 34 * 35 * Returns: On succes it returns 0, `-errno` otherwise. 36 */ 37 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) 38 { 39 assert(uring.payload is null, "Uring is already initialized"); 40 uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof); 41 if (uring.payload is null) return -errno; 42 43 uring.payload.params.flags = flags; 44 uring.payload.refs = 1; 45 auto r = io_uring_setup(entries, uring.payload.params); 46 if (r < 0) return -errno; 47 48 uring.payload.fd = r; 49 50 if (uring.payload.mapRings() < 0) 51 { 52 dispose(uring); 53 return -errno; 54 } 55 56 // debug printf("uring(%d): setup\n", uring.payload.fd); 57 58 return 0; 59 } 60 61 /** 62 * Main entry point to work with io_uring. 63 * 64 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 65 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 66 * 67 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 68 * 69 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 70 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 71 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 72 * reused for multiple operations. 73 */ 74 struct Uring 75 { 76 nothrow @nogc: 77 78 private UringDesc* payload; 79 private void checkInitialized() const 80 { 81 assert(payload !is null, "Uring hasn't been initialized yet"); 82 } 83 84 /// Copy constructor 85 this(ref return scope Uring rhs) 86 { 87 assert(rhs.payload !is null, "rhs payload is null"); 88 // debug printf("uring(%d): copy\n", rhs.payload.fd); 89 this.payload = rhs.payload; 90 this.payload.refs++; 91 } 92 93 /// Destructor 94 ~this() 95 { 96 dispose(this); 97 } 98 99 /// Native io_uring file descriptor 100 auto fd() const 101 { 102 checkInitialized(); 103 return payload.fd; 104 } 105 106 /// io_uring parameters 107 SetupParameters params() const return 108 { 109 checkInitialized(); 110 return payload.params; 111 } 112 113 /// Check if there is some `CompletionEntry` to process. 114 bool empty() const 115 { 116 checkInitialized(); 117 return payload.cq.empty; 118 } 119 120 /// Check if there is space for another `SubmissionEntry` to submit. 121 bool full() const 122 { 123 checkInitialized(); 124 return payload.sq.full; 125 } 126 127 /// Available space in submission queue before it becomes full 128 size_t capacity() const 129 { 130 checkInitialized(); 131 return payload.sq.capacity; 132 } 133 134 /// Number of entries in completion queue 135 size_t length() const 136 { 137 checkInitialized(); 138 return payload.cq.length; 139 } 140 141 /// Get first `CompletionEntry` from cq ring 142 ref CompletionEntry front() return 143 { 144 checkInitialized(); 145 return payload.cq.front; 146 } 147 148 /// Move to next `CompletionEntry` 149 void popFront() 150 { 151 checkInitialized(); 152 return payload.cq.popFront; 153 } 154 155 /** 156 * Adds new entry to the `SubmissionQueue`. 157 * 158 * Note that this just adds entry to the queue and doesn't advance the tail 159 * marker kernel sees. For that `finishSq()` is needed to be called next. 160 * 161 * Also note that to actually enter new entries to kernel, 162 * it's needed to call `submit()`. 163 * 164 * Params: 165 * FN = Function to fill next entry in queue by `ref` (should be faster). 166 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 167 * Note that in this case queue entry is cleaned first before function is called. 168 * entry = Custom built `SubmissionEntry` to be posted as is. 169 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 170 * args = Optional arguments passed to the function 171 * 172 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 173 */ 174 ref Uring put()(auto ref SubmissionEntry entry) return 175 { 176 checkInitialized(); 177 payload.sq.put(entry); 178 return this; 179 } 180 181 /// ditto 182 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 183 { 184 import std.functional : forward; 185 checkInitialized(); 186 payload.sq.putWith!FN(forward!args); 187 return this; 188 } 189 190 /** 191 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 192 * to next `SubmissionEntry` in queue. 193 * 194 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 195 * 196 * Params: 197 * op = Custom operation definition. 198 * Returns: 199 */ 200 ref Uring put(OP)(auto ref OP op) return 201 if (!is(OP == SubmissionEntry)) 202 { 203 checkInitialized(); 204 payload.sq.put(op); 205 return this; 206 } 207 208 /** 209 * If completion queue is full, the new event maybe dropped. 210 * This value records number of dropped events. 211 */ 212 uint overflow() const 213 { 214 checkInitialized(); 215 return payload.cq.overflow; 216 } 217 218 /// Counter of invalid submissions (out-of-bound index in submission array) 219 uint dropped() const 220 { 221 checkInitialized(); 222 return payload.sq.dropped; 223 } 224 225 /** 226 * Submits qued `SubmissionEntry` to be processed by kernel. 227 * 228 * Params: 229 * want = number of `CompletionEntries` to wait for. 230 * If 0, this just submits queued entries and returns. 231 * If > 0, it blocks until at least wanted number of entries were completed. 232 * sig = See io_uring_enter(2) man page 233 * 234 * Returns: Number of submitted entries on success, `-errno` on error 235 */ 236 auto submit(uint want = 0, const sigset_t* sig = null) @trusted 237 { 238 checkInitialized(); 239 240 auto len = cast(uint)payload.sq.length; 241 if (len > 0) // anything to submit? 242 { 243 EnterFlags flags; 244 if (want > 0) flags |= EnterFlags.GETEVENTS; 245 246 payload.sq.flushTail(); // advance queue index 247 248 if (payload.params.flags & SetupFlags.SQPOLL) 249 { 250 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP) 251 flags |= EnterFlags.SQ_WAKEUP; 252 else if (want == 0) return len; // fast poll 253 } 254 auto r = io_uring_enter(payload.fd, len, want, flags, sig); 255 if (r < 0) return -errno; 256 return r; 257 } 258 else if (want > 0) return wait(want); // just simple wait 259 return 0; 260 } 261 262 /** 263 * Simmilar to `submit` but with this method we just wait for required number 264 * of `CompletionEntries`. 265 * 266 * Returns: `0` on success, `-errno` on error 267 */ 268 auto wait(uint want = 1, const sigset_t* sig = null) @trusted 269 { 270 pragma(inline); 271 checkInitialized(); 272 assert(want > 0, "Invalid want value"); 273 274 if (payload.cq.length >= want) return 0; // we don't need to syscall 275 276 auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig); 277 if (r < 0) return -errno; 278 return 0; 279 } 280 281 /** 282 * Register single buffer to be mapped into the kernel for faster buffered operations. 283 * 284 * To use the buffers, the application must specify the fixed variants for of operations, 285 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 286 * in entry extra data. 287 * 288 * An application can increase or decrease the size or number of registered buffers by first 289 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 290 * the new buffers. 291 * 292 * Params: 293 * buffer = Buffers to be registered 294 * 295 * Returns: On success, returns 0. On error, `-errno` is returned. 296 */ 297 auto registerBuffers(T)(T buffers) 298 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 299 { 300 checkInitialized(); 301 assert(buffers.length, "Empty buffer"); 302 303 if (payload.regBuffers !is null) 304 return -EBUSY; // buffers were already registered 305 306 static if (is(T == ubyte[])) 307 { 308 auto p = malloc(iovec.sizeof); 309 if (p is null) return -errno; 310 payload.regBuffers = (cast(iovec*)p)[0..1]; 311 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 312 payload.regBuffers[0].iov_len = buffers.length; 313 } 314 else static if (is(T == ubyte[][])) 315 { 316 auto p = malloc(buffers.length * iovec.sizeof); 317 if (p is null) return -errno; 318 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 319 320 foreach (i, b; buffers) 321 { 322 assert(b.length, "Empty buffer"); 323 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 324 payload.regBuffers[i].iov_len = b.length; 325 } 326 } 327 328 auto r = io_uring_register( 329 payload.fd, 330 RegisterOpCode.REGISTER_BUFFERS, 331 cast(const(void)*)&payload.regBuffers[0], 1 332 ); 333 334 if (r < 0) return -errno; 335 return 0; 336 } 337 338 /** 339 * Releases all previously registered buffers associated with the `io_uring` instance. 340 * 341 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 342 * 343 * Returns: On success, returns 0. On error, `-errno` is returned. 344 */ 345 auto unregisterBuffers() @trusted 346 { 347 checkInitialized(); 348 349 if (payload.regBuffers is null) 350 return -ENXIO; // no buffers were registered 351 352 free(cast(void*)&payload.regBuffers[0]); 353 payload.regBuffers = null; 354 355 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 356 if (r < 0) return -errno; 357 return 0; 358 } 359 360 /** 361 * Register files for I/O. 362 * 363 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 364 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 365 * file descriptor array. 366 * 367 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 368 * need only unregister if it wishes to register a new set of fds. 369 * 370 * Use `-1` as a file descriptor to mark it as reserved in the array.* 371 * Params: fds = array of file descriptors to be registered 372 * 373 * Returns: On success, returns 0. On error, `-errno` is returned. 374 */ 375 auto registerFiles(const(int)[] fds) 376 { 377 checkInitialized(); 378 assert(fds.length, "No file descriptors provided"); 379 assert(fds.length < uint.max, "Too many file descriptors"); 380 381 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 382 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 383 if (r < 0) return -errno; 384 return 0; 385 } 386 387 /* 388 * Register an update for an existing file set. The updates will start at 389 * `off` in the original array. 390 * 391 * Use `-1` as a file descriptor to mark it as reserved in the array. 392 * 393 * Params: 394 * off = offset to the original registered files to be updated 395 * files = array of file descriptors to update with 396 * 397 * Returns: number of files updated on success, -errno on failure. 398 */ 399 auto registerFilesUpdate(uint off, const(int)[] fds) @trusted 400 { 401 struct Update 402 { 403 uint offset; 404 uint _resv; 405 ulong pfds; 406 } 407 408 static assert (Update.sizeof == 16); 409 410 checkInitialized(); 411 assert(fds.length, "No file descriptors provided to update"); 412 assert(fds.length < uint.max, "Too many file descriptors"); 413 414 Update u = { offset: off, pfds: cast(ulong)&fds[0] }; 415 auto r = io_uring_register( 416 payload.fd, 417 RegisterOpCode.REGISTER_FILES_UPDATE, 418 &u, cast(uint)fds.length); 419 if (r < 0) return -errno; 420 return 0; 421 } 422 423 /** 424 * All previously registered files associated with the `io_uring` instance will be unregistered. 425 * 426 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 427 * need only unregister if it wishes to register a new set of fds. 428 * 429 * Returns: On success, returns 0. On error, `-errno` is returned. 430 */ 431 auto unregisterFiles() @trusted 432 { 433 checkInitialized(); 434 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 435 if (r < 0) return -errno; 436 return 0; 437 } 438 439 /** 440 * Registers event file descriptor that would be used as a notification mechanism on completion 441 * queue change. 442 * 443 * Params: eventFD = event filedescriptor to be notified about change 444 * 445 * Returns: On success, returns 0. On error, `-errno` is returned. 446 */ 447 auto registerEventFD(int eventFD) @trusted 448 { 449 checkInitialized(); 450 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 451 if (r < 0) return -errno; 452 return 0; 453 } 454 455 /** 456 * Unregister previously registered notification event file descriptor. 457 * 458 * Returns: On success, returns 0. On error, `-errno` is returned. 459 */ 460 auto unregisterEventFD() @trusted 461 { 462 checkInitialized(); 463 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 464 if (r < 0) return -errno; 465 return 0; 466 } 467 } 468 469 /** 470 * Uses custom operation definition to fill fields of `SubmissionEntry`. 471 * Can be used in cases, when builtin prep* functions aren't enough. 472 * 473 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 474 * 475 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 476 * So it might be needed to call `clear` first on the entry (depends on usage). 477 * 478 * Params: 479 * entry = entry to set parameters to 480 * op = operation to fill entry with (can be custom type) 481 */ 482 void fill(E)(ref SubmissionEntry entry, auto ref E op) 483 { 484 pragma(inline); 485 import std.traits : hasMember, FieldNameTuple; 486 487 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 488 foreach (m; FieldNameTuple!E) 489 { 490 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 491 __traits(getMember, entry, m) = __traits(getMember, op, m); 492 } 493 } 494 495 /** 496 * Template function to help set `SubmissionEntry` `user_data` field. 497 * 498 * Params: 499 * entry = `SubmissionEntry` to prepare 500 * data = data to set to the `SubmissionEntry` 501 * 502 * Note: data are passed by ref and must live during whole operation. 503 */ 504 void setUserData(D)(ref SubmissionEntry entry, ref D data) 505 { 506 entry.user_data = cast(ulong)(cast(void*)&data); 507 } 508 509 private void prepRW(ref SubmissionEntry entry, Operation op, 510 int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe 511 { 512 pragma(inline, true); 513 entry.opcode = op; 514 entry.fd = fd; 515 entry.off = offset; 516 entry.flags = SubmissionEntryFlags.NONE; 517 entry.ioprio = 0; 518 entry.addr = cast(ulong)addr; 519 entry.len = len; 520 entry.rw_flags = ReadWriteFlags.NONE; 521 entry.user_data = 0; 522 entry.__pad2[0] = entry.__pad2[1] = entry.__pad2[2] = 0; 523 } 524 525 /** 526 * Prepares `nop` operation. 527 * 528 * Params: 529 * entry = `SubmissionEntry` to prepare 530 */ 531 void prepNop(ref SubmissionEntry entry) @safe 532 { 533 entry.prepRW(Operation.NOP); 534 } 535 536 /** 537 * Prepares `readv` operation. 538 * 539 * Params: 540 * entry = `SubmissionEntry` to prepare 541 * fd = file descriptor of file we are operating on 542 * offset = offset 543 * buffer = iovec buffers to be used by the operation 544 */ 545 void prepReadv(V)(ref SubmissionEntry entry, int fd, ref const V buffer, long offset) 546 if (is(V == iovec[]) || is(V == iovec)) 547 { 548 static if (is(V == iovec[])) 549 { 550 assert(buffer.length, "Empty buffer"); 551 assert(buffer.length < uint.max, "Too many iovec buffers"); 552 entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 553 } 554 else entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset); 555 } 556 557 /** 558 * Prepares `writev` operation. 559 * 560 * Params: 561 * entry = `SubmissionEntry` to prepare 562 * fd = file descriptor of file we are operating on 563 * offset = offset 564 * buffer = iovec buffers to be used by the operation 565 */ 566 void prepWritev(V)(ref SubmissionEntry entry, int fd, ref const V buffer, long offset) 567 if (is(V == iovec[]) || is(V == iovec)) 568 { 569 static if (is(V == iovec[])) 570 { 571 assert(buffer.length, "Empty buffer"); 572 assert(buffer.length < uint.max, "Too many iovec buffers"); 573 entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 574 } 575 else entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset); 576 } 577 578 /** 579 * Prepares `read_fixed` operation. 580 * 581 * Params: 582 * entry = `SubmissionEntry` to prepare 583 * fd = file descriptor of file we are operating on 584 * offset = offset 585 * buffer = slice to preregistered buffer 586 * bufferIndex = index to the preregistered buffers array buffer belongs to 587 */ 588 void prepReadFixed(ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) 589 { 590 assert(buffer.length, "Empty buffer"); 591 assert(buffer.length < uint.max, "Buffer too large"); 592 entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 593 entry.buf_index = bufferIndex; 594 } 595 596 /** 597 * Prepares `write_fixed` operation. 598 * 599 * Params: 600 * entry = `SubmissionEntry` to prepare 601 * fd = file descriptor of file we are operating on 602 * offset = offset 603 * buffer = slice to preregistered buffer 604 * bufferIndex = index to the preregistered buffers array buffer belongs to 605 */ 606 void prepWriteFixed(ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) 607 { 608 assert(buffer.length, "Empty buffer"); 609 assert(buffer.length < uint.max, "Buffer too large"); 610 entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 611 entry.buf_index = bufferIndex; 612 } 613 614 /** 615 * Prepares `recvmsg(2)` operation. 616 * 617 * Params: 618 * entry = `SubmissionEntry` to prepare 619 * fd = file descriptor of file we are operating on 620 * msg = message to operate with 621 * flags = `recvmsg` operation flags 622 * 623 * Note: Available from Linux 5.3 624 * 625 * See_Also: `recvmsg(2)` man page for details. 626 */ 627 void prepRecvMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) 628 { 629 entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0); 630 entry.msg_flags = flags; 631 } 632 633 /** 634 * Prepares `sendmsg(2)` operation. 635 * 636 * Params: 637 * entry = `SubmissionEntry` to prepare 638 * fd = file descriptor of file we are operating on 639 * msg = message to operate with 640 * flags = `sendmsg` operation flags 641 * 642 * Note: Available from Linux 5.3 643 * 644 * See_Also: `sendmsg(2)` man page for details. 645 */ 646 void prepSendMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) 647 { 648 entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0); 649 entry.msg_flags = flags; 650 } 651 652 /** 653 * Prepares `fsync` operation. 654 * 655 * Params: 656 * entry = `SubmissionEntry` to prepare 657 * fd = file descriptor of a file to call `fsync` on 658 * flags = `fsync` operation flags 659 */ 660 void prepFsync(ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 661 { 662 entry.prepRW(Operation.FSYNC, fd); 663 entry.fsync_flags = flags; 664 } 665 666 /** 667 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 668 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 669 * That is, once the poll operation is completed, it will have to be resubmitted. 670 * 671 * Params: 672 * entry = `SubmissionEntry` to prepare 673 * fd = file descriptor to poll 674 * events = events to poll on the FD 675 */ 676 void prepPollAdd(ref SubmissionEntry entry, int fd, PollEvents events) @safe 677 { 678 import std.system : endian, Endian; 679 680 entry.prepRW(Operation.POLL_ADD, fd); 681 static if (endian == Endian.bigEndian) 682 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 683 else 684 entry.poll_events32 = events; 685 } 686 687 /** 688 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 689 * `0`. If not found, res will contain `-ENOENT`. 690 * 691 * Params: 692 * entry = `SubmissionEntry` to prepare 693 * userData = data with the previously issued poll operation 694 */ 695 void prepPollRemove(D)(ref SubmissionEntry entry, ref D userData) 696 { 697 entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData); 698 } 699 700 /** 701 * Prepares `sync_file_range(2)` operation. 702 * 703 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 704 * by the file descriptor fd with disk. 705 * 706 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 707 * 708 * Params: 709 * entry = `SubmissionEntry` to prepare 710 * fd = is the file descriptor to sync 711 * offset = the starting byte of the file range to be synchronized 712 * len = the length of the range to be synchronized, in bytes 713 * flags = the flags for the command. 714 * 715 * See_Also: `sync_file_range(2)` for the general description of the related system call. 716 * 717 * Note: available from Linux 5.2 718 */ 719 void prepSyncFileRange(ref SubmissionEntry entry, int fd, ulong offset, uint len, 720 SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe 721 { 722 entry.opcode = Operation.SYNC_FILE_RANGE; 723 entry.fd = fd; 724 entry.off = offset; 725 entry.len = len; 726 entry.sync_range_flags = flags; 727 } 728 729 /** 730 * This command will register a timeout operation. 731 * 732 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 733 * timeout condition is met when either the specified timeout expires, or the specified number of 734 * events have completed. Either condition will trigger the event. The request will complete with 735 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 736 * completed through requests completing on their own. If the timeout was cancelled before it 737 * expired, the request will complete with `-ECANCELED`. 738 * 739 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 740 * 741 * Params: 742 * entry = `SubmissionEntry` to prepare 743 * time = reference to `time64` data structure 744 * count = completion event count 745 * flags = define if it's a relative or absolute time 746 * 747 * Note: Available from Linux 5.4 748 */ 749 void prepTimeout(ref SubmissionEntry entry, ref KernelTimespec time, 750 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) 751 { 752 entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count); 753 entry.timeout_flags = flags; 754 } 755 756 /** 757 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 758 * 759 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 760 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 761 * timeout request was found but expiration was already in progress, this request will terminate 762 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 763 * terminate with a result value of `-ENOENT`. 764 * 765 * Params: 766 * entry = `SubmissionEntry` to prepare 767 * userData = user data provided with the previously issued timeout operation 768 * 769 * Note: Available from Linux 5.5 770 */ 771 void prepTimeoutRemove(D)(ref SubmissionEntry entry, ref D userData) 772 { 773 entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData); 774 } 775 776 /** 777 * Prepares `accept4(2)` operation. 778 * 779 * See_Also: `accept4(2)`` for the general description of the related system call. 780 * 781 * Params: 782 * entry = `SubmissionEntry` to prepare 783 * fd = socket file descriptor 784 * addr = reference to one of sockaddr structires to be filled with accepted client address 785 * addrlen = reference to addrlen field that would be filled with accepted client address length 786 * 787 * Note: Available from Linux 5.5 788 */ 789 void prepAccept(ADDR)(ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 790 AcceptFlags flags = AcceptFlags.NONE) 791 { 792 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 793 entry.accept_flags = flags; 794 } 795 796 /** 797 * Prepares operation that cancels existing async work. 798 * 799 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 800 * distinction to make here with the different kinds of commands. A read/write on a regular file 801 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 802 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 803 * these operations if they haven’t yet been started. If they have been started, cancellations on 804 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 805 * at any time. The completion event for this request will have a result of 0 if done successfully, 806 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 807 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 808 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 809 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 810 * possible. 811 * 812 * Params: 813 * entry = `SubmissionEntry` to prepare 814 * userData = `user_data` field of the request that should be cancelled 815 * 816 * Note: Available from Linux 5.5 817 */ 818 void prepCancel(D)(ref SubmissionEntry entry, ref D userData, uint flags = 0) 819 { 820 entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData); 821 entry.cancel_flags = flags; 822 } 823 824 /** 825 * Prepares linked timeout operation. 826 * 827 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 828 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 829 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 830 * completion event count as it's tied to a specific request. If used, the timeout specified in the 831 * command will cancel the linked command, unless the linked command completes before the 832 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 833 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 834 * request. 835 * 836 * Note: Available from Linux 5.5 837 * 838 * Params: 839 * entry = `SubmissionEntry` to prepare 840 * time = time specification 841 * flags = define if it's a relative or absolute time 842 */ 843 void prepLinkTimeout(ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) 844 { 845 entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0); 846 entry.timeout_flags = flags; 847 } 848 849 /** 850 * Note: Available from Linux 5.5 851 */ 852 void prepConnect(ADDR)(ref SubmissionEntry entry, int fd, ref const(ADDR) addr) 853 { 854 entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof); 855 } 856 857 /** 858 * Note: Available from Linux 5.6 859 */ 860 void prepFilesUpdate(ref SubmissionEntry entry, int[] fds, int offset) 861 { 862 entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset); 863 } 864 865 /** 866 * Note: Available from Linux 5.6 867 */ 868 void prepFallocate(ref SubmissionEntry entry, int fd, int mode, long offset, long len) 869 { 870 entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset); 871 } 872 873 /** 874 * Note: Available from Linux 5.6 875 */ 876 void prepOpenat(ref SubmissionEntry entry, int fd, const char* path, int flags, uint mode) 877 { 878 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 879 entry.open_flags = flags; 880 } 881 882 /** 883 * Note: Available from Linux 5.6 884 */ 885 void prepClose(ref SubmissionEntry entry, int fd) 886 { 887 entry.prepRW(Operation.CLOSE, fd); 888 } 889 890 /** 891 * Note: Available from Linux 5.6 892 */ 893 void prepRead(ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) 894 { 895 entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 896 } 897 898 /** 899 * Note: Available from Linux 5.6 900 */ 901 void prepWrite(ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) 902 { 903 entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 904 } 905 906 /** 907 * Note: Available from Linux 5.6 908 */ 909 void prepStatx(Statx)(ref SubmissionEntry entry, int fd, const char* path, int flags, uint mask, ref Statx statxbuf) 910 { 911 entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf)); 912 entry.statx_flags = flags; 913 } 914 915 /** 916 * Note: Available from Linux 5.6 917 */ 918 void prepFadvise(ref SubmissionEntry entry, int fd, long offset, uint len, int advice) 919 { 920 entry.prepRW(Operation.FADVISE, fd, null, len, offset); 921 entry.fadvise_advice = advice; 922 } 923 924 /** 925 * Note: Available from Linux 5.6 926 */ 927 void prepMadvise(ref SubmissionEntry entry, const(ubyte)[] block, int advice) 928 { 929 entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0); 930 entry.fadvise_advice = advice; 931 } 932 933 /** 934 * Note: Available from Linux 5.6 935 */ 936 void prepSend(ref SubmissionEntry entry, int sockfd, const(ubyte)[] buf, MsgFlags flags) 937 { 938 entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 939 entry.msg_flags = flags; 940 } 941 942 /** 943 * Note: Available from Linux 5.6 944 */ 945 void prepRecv(ref SubmissionEntry entry, int sockfd, ubyte[] buf, MsgFlags flags) 946 { 947 entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 948 entry.msg_flags = flags; 949 } 950 951 /** 952 * Note: Available from Linux 5.6 953 */ 954 void prepOpenat2(ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) 955 { 956 entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 957 } 958 959 /** 960 * Note: Available from Linux 5.6 961 */ 962 void prepEpollCtl(ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) 963 { 964 entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd); 965 } 966 967 /** 968 * Note: Available from Linux 5.7 969 */ 970 void prepSplice(ref SubmissionEntry entry, 971 int fd_in, ulong off_in, 972 int fd_out, ulong off_out, 973 uint nbytes, uint splice_flags) 974 { 975 entry.prepRW(Operation.SPLICE, fd_out, null, nbytes, off_out); 976 entry.splice_off_in = off_in; 977 entry.splice_fd_in = fd_in; 978 entry.splice_flags = splice_flags; 979 } 980 981 /** 982 * Note: Available from Linux 5.7 983 */ 984 void prepProvideBuffers(ref SubmissionEntry entry, ubyte[] buf, int nr, ushort bgid, int bid) 985 { 986 entry.prepRW(Operation.PROVIDE_BUFFERS, nr, cast(void*)&buf[0], cast(uint)buf.length, bid); 987 entry.buf_group = bgid; 988 } 989 990 /** 991 * Note: Available from Linux 5.7 992 */ 993 void prepRemoveBuffers(ref SubmissionEntry entry, int nr, ushort bgid) 994 { 995 entry.prepRW(Operation.REMOVE_BUFFERS, nr); 996 entry.buf_group = bgid; 997 } 998 999 /** 1000 * Note: Available from Linux 5.8 1001 */ 1002 void prepTee(ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) 1003 { 1004 entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0); 1005 entry.splice_off_in = 0; 1006 entry.splice_fd_in = fd_in; 1007 entry.splice_flags = flags; 1008 } 1009 1010 private: 1011 1012 // uring cleanup 1013 void dispose(ref Uring uring) 1014 { 1015 if (uring.payload is null) return; 1016 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 1017 if (--uring.payload.refs == 0) 1018 { 1019 import std.traits : hasElaborateDestructor; 1020 // debug printf("uring(%d): free\n", uring.payload.fd); 1021 static if (hasElaborateDestructor!UringDesc) 1022 destroy(*uring.payload); // call possible destructors 1023 free(cast(void*)uring.payload); 1024 } 1025 uring.payload = null; 1026 } 1027 1028 // system fields descriptor 1029 struct UringDesc 1030 { 1031 nothrow @nogc: 1032 1033 int fd; 1034 size_t refs; 1035 SetupParameters params; 1036 SubmissionQueue sq; 1037 CompletionQueue cq; 1038 1039 iovec[] regBuffers; 1040 1041 ~this() 1042 { 1043 if (regBuffers) free(cast(void*)®Buffers[0]); 1044 if (sq.ring) munmap(sq.ring, sq.ringSize); 1045 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 1046 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 1047 close(fd); 1048 } 1049 1050 private auto mapRings() 1051 { 1052 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 1053 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 1054 1055 if (params.features & SetupFeatures.SINGLE_MMAP) 1056 { 1057 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 1058 cq.ringSize = sq.ringSize; 1059 } 1060 1061 sq.ring = mmap(null, sq.ringSize, 1062 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1063 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 1064 ); 1065 1066 if (sq.ring == MAP_FAILED) 1067 { 1068 sq.ring = null; 1069 return -errno; 1070 } 1071 1072 if (params.features & SetupFeatures.SINGLE_MMAP) 1073 cq.ring = sq.ring; 1074 else 1075 { 1076 cq.ring = mmap(null, cq.ringSize, 1077 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1078 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 1079 ); 1080 1081 if (cq.ring == MAP_FAILED) 1082 { 1083 cq.ring = null; 1084 return -errno; // cleanup is done in struct destructors 1085 } 1086 } 1087 1088 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 1089 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 1090 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 1091 sq.localTail = *sq.ktail; 1092 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 1093 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 1094 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 1095 1096 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 1097 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 1098 // That way, head and tail are actually indexes to our sqes array. 1099 foreach (i; 0..entries) 1100 { 1101 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 1102 } 1103 1104 auto psqes = mmap( 1105 null, entries * SubmissionEntry.sizeof, 1106 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1107 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 1108 ); 1109 1110 if (psqes == MAP_FAILED) return -errno; 1111 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 1112 1113 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 1114 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 1115 cq.localHead = *cq.khead; 1116 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 1117 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 1118 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 1119 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 1120 cq.kflags = cast(uint*)(cq.ring + params.cq_off.flags); 1121 return 0; 1122 } 1123 } 1124 1125 /// Wraper for `SubmissionEntry` queue 1126 struct SubmissionQueue 1127 { 1128 nothrow @nogc: 1129 1130 // mmaped fields 1131 uint* khead; // controlled by kernel 1132 uint* ktail; // controlled by us 1133 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1134 uint* kdropped; // counter of invalid submissions (out of bound index) 1135 uint ringMask; // constant mask used to determine array index from head/tail 1136 1137 // mmap details (for cleanup) 1138 void* ring; // pointer to the mmaped region 1139 size_t ringSize; // size of mmaped memory block 1140 1141 // mmapped list of entries (fixed length) 1142 SubmissionEntry[] sqes; 1143 1144 uint localTail; // used for batch submission 1145 1146 uint head() const { return atomicLoad!(MemoryOrder.acq)(*khead); } 1147 uint tail() const { return localTail; } 1148 1149 void flushTail() 1150 { 1151 pragma(inline); 1152 // debug printf("SQ updating tail: %d\n", localTail); 1153 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1154 } 1155 1156 SubmissionQueueFlags flags() const 1157 { 1158 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1159 } 1160 1161 bool full() const { return sqes.length == length; } 1162 1163 size_t length() const { return tail - head; } 1164 1165 size_t capacity() const { return sqes.length - length; } 1166 1167 void put()(auto ref SubmissionEntry entry) 1168 { 1169 assert(!full, "SumbissionQueue is full"); 1170 sqes[tail & ringMask] = entry; 1171 localTail++; 1172 } 1173 1174 void put(OP)(auto ref OP op) 1175 if (!is(OP == SubmissionEntry)) 1176 { 1177 assert(!full, "SumbissionQueue is full"); 1178 sqes[tail & ringMask].clear(); 1179 sqes[tail & ringMask].fill(op); 1180 localTail++; 1181 } 1182 1183 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1184 { 1185 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1186 1187 static assert( 1188 Parameters!FN.length >= 1 1189 && is(Parameters!FN[0] == SubmissionEntry) 1190 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1191 "Alias function must accept at least `ref SubmissionEntry`"); 1192 1193 static assert( 1194 is(typeof(FN(sqes[tail & ringMask], args))), 1195 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1196 1197 assert(!full, "SumbissionQueue is full"); 1198 sqes[tail & ringMask].clear(); 1199 FN(sqes[tail & ringMask], args); 1200 localTail++; 1201 } 1202 1203 uint dropped() const { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1204 } 1205 1206 struct CompletionQueue 1207 { 1208 nothrow @nogc: 1209 1210 // mmaped fields 1211 uint* khead; // controlled by us (increment after entry at head was read) 1212 uint* ktail; // updated by kernel 1213 uint* koverflow; 1214 uint* kflags; 1215 CompletionEntry[] cqes; // array of entries (fixed length) 1216 1217 uint ringMask; // constant mask used to determine array index from head/tail 1218 1219 // mmap details (for cleanup) 1220 void* ring; 1221 size_t ringSize; 1222 1223 uint localHead; // used for bulk reading 1224 1225 uint head() const { return localHead; } 1226 uint tail() const { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1227 1228 void flushHead() 1229 { 1230 pragma(inline); 1231 // debug printf("CQ updating head: %d\n", localHead); 1232 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1233 } 1234 1235 bool empty() const { return head == tail; } 1236 1237 ref CompletionEntry front() return 1238 { 1239 assert(!empty, "CompletionQueue is empty"); 1240 return cqes[localHead & ringMask]; 1241 } 1242 1243 void popFront() 1244 { 1245 pragma(inline); 1246 assert(!empty, "CompletionQueue is empty"); 1247 localHead++; 1248 flushHead(); 1249 } 1250 1251 size_t length() const { return tail - localHead; } 1252 1253 uint overflow() const { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1254 1255 /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel. 1256 void flags(CQRingFlags flags) { atomicStore!(MemoryOrder.raw)(*kflags, flags); } 1257 } 1258 1259 // just a helper to use atomicStore more easily with older compilers 1260 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1261 { 1262 pragma(inline, true); 1263 import core.atomic : store = atomicStore; 1264 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1265 else store!ms(*(cast(shared T*)&val), newVal); 1266 } 1267 1268 // just a helper to use atomicLoad more easily with older compilers 1269 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1270 { 1271 pragma(inline, true); 1272 import core.atomic : load = atomicLoad; 1273 static if (__VERSION__ >= 2089) return load!ms(val); 1274 else return load!ms(*(cast(const shared T*)&val)); 1275 } 1276 1277 version (assert) 1278 { 1279 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1280 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1281 static assert(isOutputRange!(Uring, SubmissionEntry)); 1282 }