1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version(linux) {} 8 else { 9 pragma(msg, "!!!!!!! during/io_uring is available ONLY on Linux systems (5.1+). This package is useless on your system. !!!!!!!!!"); 10 static assert(0, "during is not available on your system"); 11 } 12 13 public import during.io_uring; 14 import during.openat2; 15 16 import core.atomic : MemoryOrder; 17 debug import core.stdc.stdio; 18 import core.stdc.stdlib; 19 import core.sys.linux.epoll; 20 import core.sys.linux.errno; 21 import core.sys.linux.sys.mman; 22 import core.sys.linux.unistd; 23 import core.sys.posix.signal; 24 import core.sys.posix.sys.socket; 25 import core.sys.posix.sys.uio; 26 import std.algorithm.comparison : among; 27 28 nothrow @nogc: 29 30 /** 31 * Setup new instance of io_uring into provided `Uring` structure. 32 * 33 * Params: 34 * uring = `Uring` structure to be initialized (must not be already initialized) 35 * entries = Number of entries to initialize uring with 36 * flags = `SetupFlags` to use to initialize uring. 37 * 38 * Returns: On succes it returns 0, `-errno` otherwise. 39 */ 40 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) 41 { 42 assert(uring.payload is null, "Uring is already initialized"); 43 uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof); 44 if (uring.payload is null) return -errno; 45 46 uring.payload.params.flags = flags; 47 uring.payload.refs = 1; 48 auto r = io_uring_setup(entries, uring.payload.params); 49 if (r < 0) return -errno; 50 51 uring.payload.fd = r; 52 53 r = uring.payload.mapRings(); 54 if (r < 0) 55 { 56 dispose(uring); 57 return r; 58 } 59 60 // debug printf("uring(%d): setup\n", uring.payload.fd); 61 62 return 0; 63 } 64 65 /** 66 * Main entry point to work with io_uring. 67 * 68 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 69 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 70 * 71 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 72 * 73 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 74 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 75 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 76 * reused for multiple operations. 77 */ 78 struct Uring 79 { 80 nothrow @nogc: 81 82 private UringDesc* payload; 83 private void checkInitialized() const @safe pure 84 { 85 assert(payload !is null, "Uring hasn't been initialized yet"); 86 } 87 88 /// Copy constructor 89 this(ref return scope Uring rhs) @safe pure 90 { 91 assert(rhs.payload !is null, "rhs payload is null"); 92 // debug printf("uring(%d): copy\n", rhs.payload.fd); 93 this.payload = rhs.payload; 94 this.payload.refs++; 95 } 96 97 /// Destructor 98 ~this() @safe 99 { 100 dispose(this); 101 } 102 103 /// Native io_uring file descriptor 104 int fd() const @safe pure 105 { 106 checkInitialized(); 107 return payload.fd; 108 } 109 110 /// io_uring parameters 111 SetupParameters params() const @safe pure return 112 { 113 checkInitialized(); 114 return payload.params; 115 } 116 117 /// Check if there is some `CompletionEntry` to process. 118 bool empty() const @safe pure 119 { 120 checkInitialized(); 121 return payload.cq.empty; 122 } 123 124 /// Check if there is space for another `SubmissionEntry` to submit. 125 bool full() const @safe pure 126 { 127 checkInitialized(); 128 return payload.sq.full; 129 } 130 131 /// Available space in submission queue before it becomes full 132 size_t capacity() const @safe pure 133 { 134 checkInitialized(); 135 return payload.sq.capacity; 136 } 137 138 /// Number of entries in completion queue 139 size_t length() const @safe pure 140 { 141 checkInitialized(); 142 return payload.cq.length; 143 } 144 145 /// Get first `CompletionEntry` from cq ring 146 ref CompletionEntry front() @safe pure return 147 { 148 checkInitialized(); 149 return payload.cq.front; 150 } 151 152 /// Move to next `CompletionEntry` 153 void popFront() @safe pure 154 { 155 checkInitialized(); 156 return payload.cq.popFront; 157 } 158 159 /** 160 * Adds new entry to the `SubmissionQueue`. 161 * 162 * Note that this just adds entry to the queue and doesn't advance the tail 163 * marker kernel sees. For that `finishSq()` is needed to be called next. 164 * 165 * Also note that to actually enter new entries to kernel, 166 * it's needed to call `submit()`. 167 * 168 * Params: 169 * FN = Function to fill next entry in queue by `ref` (should be faster). 170 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 171 * Note that in this case queue entry is cleaned first before function is called. 172 * entry = Custom built `SubmissionEntry` to be posted as is. 173 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 174 * args = Optional arguments passed to the function 175 * 176 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 177 */ 178 ref Uring put()(auto ref SubmissionEntry entry) @safe pure return 179 { 180 checkInitialized(); 181 payload.sq.put(entry); 182 return this; 183 } 184 185 /// ditto 186 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 187 { 188 import std.functional : forward; 189 checkInitialized(); 190 payload.sq.putWith!FN(forward!args); 191 return this; 192 } 193 194 /** 195 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 196 * to next `SubmissionEntry` in queue. 197 * 198 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 199 * 200 * Params: 201 * op = Custom operation definition. 202 * Returns: 203 */ 204 ref Uring put(OP)(auto ref OP op) return 205 if (!is(OP == SubmissionEntry)) 206 { 207 checkInitialized(); 208 payload.sq.put(op); 209 return this; 210 } 211 212 /** 213 * Advances the userspace submision queue and returns last `SubmissionEntry`. 214 */ 215 ref SubmissionEntry next()() @safe pure 216 { 217 checkInitialized(); 218 return payload.sq.next(); 219 } 220 221 /** 222 * If completion queue is full, the new event maybe dropped. 223 * This value records number of dropped events. 224 */ 225 uint overflow() const @safe pure 226 { 227 checkInitialized(); 228 return payload.cq.overflow; 229 } 230 231 /// Counter of invalid submissions (out-of-bound index in submission array) 232 uint dropped() const @safe pure 233 { 234 checkInitialized(); 235 return payload.sq.dropped; 236 } 237 238 /** 239 * Submits qued `SubmissionEntry` to be processed by kernel. 240 * 241 * Params: 242 * want = number of `CompletionEntries` to wait for. 243 * If 0, this just submits queued entries and returns. 244 * If > 0, it blocks until at least wanted number of entries were completed. 245 * sig = See io_uring_enter(2) man page 246 * 247 * Returns: Number of submitted entries on success, `-errno` on error 248 */ 249 int submit(S)(uint want, const S* args) @trusted 250 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 251 { 252 checkInitialized(); 253 254 immutable len = cast(int)payload.sq.length; 255 if (len > 0) // anything to submit? 256 { 257 EnterFlags flags; 258 if (want > 0) flags |= EnterFlags.GETEVENTS; 259 260 payload.sq.flushTail(); // advance queue index 261 262 if (payload.params.flags & SetupFlags.SQPOLL) 263 { 264 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false)) 265 flags |= EnterFlags.SQ_WAKEUP; 266 else if (want == 0) return len; // fast poll 267 } 268 immutable r = io_uring_enter(payload.fd, len, want, flags, args); 269 if (r < 0) return -errno; 270 return r; 271 } 272 else if (want > 0) return wait(want); // just simple wait 273 return 0; 274 } 275 276 /// ditto 277 int submit(uint want = 0) @safe 278 { 279 pragma(inline, true) 280 return submit(want, cast(sigset_t*)null); 281 } 282 283 /** 284 * Simmilar to `submit` but with this method we just wait for required number 285 * of `CompletionEntries`. 286 * 287 * Returns: `0` on success, `-errno` on error 288 */ 289 int wait(uint want = 1, const sigset_t* sig = null) @trusted 290 { 291 pragma(inline); 292 checkInitialized(); 293 assert(want > 0, "Invalid want value"); 294 295 if (payload.cq.length >= want) return 0; // we don't need to syscall 296 297 immutable r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig); 298 if (r < 0) return -errno; 299 return 0; 300 } 301 302 /** 303 * Register single buffer to be mapped into the kernel for faster buffered operations. 304 * 305 * To use the buffers, the application must specify the fixed variants for of operations, 306 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 307 * in entry extra data. 308 * 309 * An application can increase or decrease the size or number of registered buffers by first 310 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 311 * the new buffers. 312 * 313 * Params: 314 * buffer = Buffers to be registered 315 * 316 * Returns: On success, returns 0. On error, `-errno` is returned. 317 */ 318 int registerBuffers(T)(T buffers) 319 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 320 { 321 checkInitialized(); 322 assert(buffers.length, "Empty buffer"); 323 324 if (payload.regBuffers !is null) 325 return -EBUSY; // buffers were already registered 326 327 static if (is(T == ubyte[])) 328 { 329 auto p = malloc(iovec.sizeof); 330 if (p is null) return -errno; 331 payload.regBuffers = (cast(iovec*)p)[0..1]; 332 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 333 payload.regBuffers[0].iov_len = buffers.length; 334 } 335 else static if (is(T == ubyte[][])) 336 { 337 auto p = malloc(buffers.length * iovec.sizeof); 338 if (p is null) return -errno; 339 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 340 341 foreach (i, b; buffers) 342 { 343 assert(b.length, "Empty buffer"); 344 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 345 payload.regBuffers[i].iov_len = b.length; 346 } 347 } 348 349 immutable r = io_uring_register( 350 payload.fd, 351 RegisterOpCode.REGISTER_BUFFERS, 352 cast(const(void)*)&payload.regBuffers[0], 1 353 ); 354 355 if (r < 0) return -errno; 356 return 0; 357 } 358 359 /** 360 * Releases all previously registered buffers associated with the `io_uring` instance. 361 * 362 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 363 * 364 * Returns: On success, returns 0. On error, `-errno` is returned. 365 */ 366 int unregisterBuffers() @trusted 367 { 368 checkInitialized(); 369 370 if (payload.regBuffers is null) 371 return -ENXIO; // no buffers were registered 372 373 free(cast(void*)&payload.regBuffers[0]); 374 payload.regBuffers = null; 375 376 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 377 if (r < 0) return -errno; 378 return 0; 379 } 380 381 /** 382 * Register files for I/O. 383 * 384 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 385 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 386 * file descriptor array. 387 * 388 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 389 * need only unregister if it wishes to register a new set of fds. 390 * 391 * Use `-1` as a file descriptor to mark it as reserved in the array.* 392 * Params: fds = array of file descriptors to be registered 393 * 394 * Returns: On success, returns 0. On error, `-errno` is returned. 395 */ 396 int registerFiles(const(int)[] fds) 397 { 398 checkInitialized(); 399 assert(fds.length, "No file descriptors provided"); 400 assert(fds.length < uint.max, "Too many file descriptors"); 401 402 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 403 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 404 if (r < 0) return -errno; 405 return 0; 406 } 407 408 /* 409 * Register an update for an existing file set. The updates will start at 410 * `off` in the original array. 411 * 412 * Use `-1` as a file descriptor to mark it as reserved in the array. 413 * 414 * Params: 415 * off = offset to the original registered files to be updated 416 * files = array of file descriptors to update with 417 * 418 * Returns: number of files updated on success, -errno on failure. 419 */ 420 int registerFilesUpdate(uint off, const(int)[] fds) @trusted 421 { 422 struct Update // represents io_uring_files_update (obsolete) or io_uring_rsrc_update 423 { 424 uint offset; 425 uint _resv; 426 ulong data; 427 } 428 429 static assert (Update.sizeof == 16); 430 431 checkInitialized(); 432 assert(fds.length, "No file descriptors provided to update"); 433 assert(fds.length < uint.max, "Too many file descriptors"); 434 435 Update u = { offset: off, data: cast(ulong)&fds[0] }; 436 immutable r = io_uring_register( 437 payload.fd, 438 RegisterOpCode.REGISTER_FILES_UPDATE, 439 &u, cast(uint)fds.length); 440 if (r < 0) return -errno; 441 return 0; 442 } 443 444 /** 445 * All previously registered files associated with the `io_uring` instance will be unregistered. 446 * 447 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 448 * need only unregister if it wishes to register a new set of fds. 449 * 450 * Returns: On success, returns 0. On error, `-errno` is returned. 451 */ 452 int unregisterFiles() @trusted 453 { 454 checkInitialized(); 455 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 456 if (r < 0) return -errno; 457 return 0; 458 } 459 460 /** 461 * Registers event file descriptor that would be used as a notification mechanism on completion 462 * queue change. 463 * 464 * Params: eventFD = event filedescriptor to be notified about change 465 * 466 * Returns: On success, returns 0. On error, `-errno` is returned. 467 */ 468 int registerEventFD(int eventFD) @trusted 469 { 470 checkInitialized(); 471 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 472 if (r < 0) return -errno; 473 return 0; 474 } 475 476 /** 477 * Unregister previously registered notification event file descriptor. 478 * 479 * Returns: On success, returns 0. On error, `-errno` is returned. 480 */ 481 int unregisterEventFD() @trusted 482 { 483 checkInitialized(); 484 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 485 if (r < 0) return -errno; 486 return 0; 487 } 488 489 /** 490 * Generic means to register resources. 491 * 492 * Note: Available from Linux 5.13 493 */ 494 int registerRsrc(R)(RegisterOpCode type, const(R)[] data, const(ulong)[] tags) 495 { 496 struct Register // represents io_uring_rsrc_register 497 { 498 uint nr; 499 uint _resv; 500 ulong _resv2; 501 ulong data; 502 ulong tags; 503 } 504 505 static assert (Register.sizeof == 32); 506 507 checkInitialized(); 508 assert(data.length == tags.length, "Different array lengths"); 509 assert(data.length < uint.max, "Too many resources"); 510 511 Register r = { 512 nr: cast(uint)data.length, 513 data: cast(ulong)&data[0], 514 tags: cast(ulong)&tags[0] 515 }; 516 immutable ret = io_uring_register( 517 payload.fd, 518 type, 519 &r, sizeof(r)); 520 if (ret < 0) return -errno; 521 return 0; 522 } 523 524 /** 525 * Generic means to update registered resources. 526 * 527 * Note: Available from Linux 5.13 528 */ 529 int registerRsrcUpdate(R)(RegisterOpCode type, uint off, const(R)[] data, const(ulong)[] tags) @trusted 530 { 531 struct Update // represents io_uring_rsrc_update2 532 { 533 uint offset; 534 uint _resv; 535 ulong data; 536 ulong tags; 537 uint nr; 538 uint _resv2; 539 } 540 541 static assert (Update.sizeof == 32); 542 543 checkInitialized(); 544 assert(data.length == tags.length, "Different array lengths"); 545 assert(data.length < uint.max, "Too many file descriptors"); 546 547 Update u = { 548 offset: off, 549 data: cast(ulong)&data[0], 550 tags: cast(ulong)&tags[0], 551 nr: cast(uint)data.length, 552 }; 553 immutable r = io_uring_register(payload.fd, type, &u, sizeof(u)); 554 if (r < 0) return -errno; 555 return 0; 556 } 557 558 /** 559 * Register a feature whitelist. Attempting to call any operations which are not whitelisted 560 * will result in an error. 561 * 562 * Note: Can only be called once to prevent other code from bypassing the whitelist. 563 * 564 * Params: res = the struct containing the restriction info. 565 * 566 * Returns: On success, returns 0. On error, `-errno` is returned. 567 * 568 * Note: Available from Linux 5.10 569 */ 570 int registerRestrictions(scope ref io_uring_restriction res) @trusted 571 { 572 checkInitialized(); 573 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_RESTRICTIONS, &res, 1); 574 if (r < 0) return -errno; 575 return 0; 576 } 577 578 /** 579 * Enable the "rings" of this Uring if they were previously disabled with 580 * `IORING_SETUP_R_DISABLED`. 581 * 582 * Returns: On success, returns 0. On error, `-errno` is returned. 583 * 584 * Note: Available from Linux 5.10 585 */ 586 int enableRings() @trusted 587 { 588 checkInitialized(); 589 immutable r = io_uring_register(payload.fd, RegisterOpCode.ENABLE_RINGS, null, 0); 590 if (r < 0) return -errno; 591 return 0; 592 } 593 } 594 595 /** 596 * Uses custom operation definition to fill fields of `SubmissionEntry`. 597 * Can be used in cases, when builtin prep* functions aren't enough. 598 * 599 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 600 * 601 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 602 * So it might be needed to call `clear` first on the entry (depends on usage). 603 * 604 * Params: 605 * entry = entry to set parameters to 606 * op = operation to fill entry with (can be custom type) 607 */ 608 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op) 609 { 610 pragma(inline); 611 import std.traits : hasMember, FieldNameTuple; 612 613 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 614 foreach (m; FieldNameTuple!E) 615 { 616 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 617 __traits(getMember, entry, m) = __traits(getMember, op, m); 618 } 619 620 return entry; 621 } 622 623 /** 624 * Template function to help set `SubmissionEntry` `user_data` field. 625 * 626 * Params: 627 * entry = `SubmissionEntry` to prepare 628 * data = data to set to the `SubmissionEntry` 629 * 630 * Note: data are passed by ref and must live during whole operation. 631 */ 632 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted 633 { 634 pragma(inline); 635 entry.user_data = cast(ulong)(cast(void*)&data); 636 return entry; 637 } 638 639 /** 640 * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData` 641 * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to 642 * the data. 643 * 644 * Because of that, data must be of `ulong.sizeof`. 645 */ 646 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted 647 if (D.sizeof == ulong.sizeof) 648 { 649 pragma(inline); 650 entry.user_data = *(cast(ulong*)(cast(void*)&data)); 651 return entry; 652 } 653 654 /** 655 * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`). 656 */ 657 D userDataAs(D)(ref CompletionEntry entry) @trusted 658 if (D.sizeof == ulong.sizeof) 659 { 660 pragma(inline); 661 return *(cast(D*)(cast(void*)&entry.user_data)); 662 } 663 664 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op, 665 int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe 666 { 667 pragma(inline); 668 entry.opcode = op; 669 entry.fd = fd; 670 entry.off = offset; 671 entry.flags = SubmissionEntryFlags.NONE; 672 entry.ioprio = 0; 673 entry.addr = cast(ulong)addr; 674 entry.len = len; 675 entry.rw_flags = ReadWriteFlags.NONE; 676 entry.user_data = 0; 677 entry.buf_index = 0; 678 entry.personality = 0; 679 entry.splice_fd_in = 0; 680 entry.__pad2[0] = entry.__pad2[1] = 0; 681 return entry; 682 } 683 684 /** 685 * Prepares `nop` operation. 686 * 687 * Params: 688 * entry = `SubmissionEntry` to prepare 689 */ 690 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe 691 { 692 entry.prepRW(Operation.NOP); 693 return entry; 694 } 695 696 /** 697 * Prepares `readv` operation. 698 * 699 * Params: 700 * entry = `SubmissionEntry` to prepare 701 * fd = file descriptor of file we are operating on 702 * offset = offset 703 * buffer = iovec buffers to be used by the operation 704 */ 705 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 706 if (is(V == iovec[]) || is(V == iovec)) 707 { 708 static if (is(V == iovec[])) 709 { 710 assert(buffer.length, "Empty buffer"); 711 assert(buffer.length < uint.max, "Too many iovec buffers"); 712 return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 713 } 714 else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset); 715 } 716 717 /** 718 * Prepares `writev` operation. 719 * 720 * Params: 721 * entry = `SubmissionEntry` to prepare 722 * fd = file descriptor of file we are operating on 723 * offset = offset 724 * buffer = iovec buffers to be used by the operation 725 */ 726 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 727 if (is(V == iovec[]) || is(V == iovec)) 728 { 729 static if (is(V == iovec[])) 730 { 731 assert(buffer.length, "Empty buffer"); 732 assert(buffer.length < uint.max, "Too many iovec buffers"); 733 return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 734 } 735 else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset); 736 } 737 738 /** 739 * Prepares `read_fixed` operation. 740 * 741 * Params: 742 * entry = `SubmissionEntry` to prepare 743 * fd = file descriptor of file we are operating on 744 * offset = offset 745 * buffer = slice to preregistered buffer 746 * bufferIndex = index to the preregistered buffers array buffer belongs to 747 */ 748 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 749 { 750 assert(buffer.length, "Empty buffer"); 751 assert(buffer.length < uint.max, "Buffer too large"); 752 entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 753 entry.buf_index = bufferIndex; 754 return entry; 755 } 756 757 /** 758 * Prepares `write_fixed` operation. 759 * 760 * Params: 761 * entry = `SubmissionEntry` to prepare 762 * fd = file descriptor of file we are operating on 763 * offset = offset 764 * buffer = slice to preregistered buffer 765 * bufferIndex = index to the preregistered buffers array buffer belongs to 766 */ 767 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 768 { 769 assert(buffer.length, "Empty buffer"); 770 assert(buffer.length < uint.max, "Buffer too large"); 771 entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 772 entry.buf_index = bufferIndex; 773 return entry; 774 } 775 776 /** 777 * Prepares `recvmsg(2)` operation. 778 * 779 * Params: 780 * entry = `SubmissionEntry` to prepare 781 * fd = file descriptor of file we are operating on 782 * msg = message to operate with 783 * flags = `recvmsg` operation flags 784 * 785 * Note: Available from Linux 5.3 786 * 787 * See_Also: `recvmsg(2)` man page for details. 788 */ 789 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 790 { 791 entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0); 792 entry.msg_flags = flags; 793 return entry; 794 } 795 796 /** 797 * Prepares `sendmsg(2)` operation. 798 * 799 * Params: 800 * entry = `SubmissionEntry` to prepare 801 * fd = file descriptor of file we are operating on 802 * msg = message to operate with 803 * flags = `sendmsg` operation flags 804 * 805 * Note: Available from Linux 5.3 806 * 807 * See_Also: `sendmsg(2)` man page for details. 808 */ 809 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 810 { 811 entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0); 812 entry.msg_flags = flags; 813 return entry; 814 } 815 816 /** 817 * Prepares `fsync` operation. 818 * 819 * Params: 820 * entry = `SubmissionEntry` to prepare 821 * fd = file descriptor of a file to call `fsync` on 822 * flags = `fsync` operation flags 823 */ 824 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 825 { 826 entry.prepRW(Operation.FSYNC, fd); 827 entry.fsync_flags = flags; 828 return entry; 829 } 830 831 /** 832 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 833 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 834 * That is, once the poll operation is completed, it will have to be resubmitted. 835 * 836 * Params: 837 * entry = `SubmissionEntry` to prepare 838 * fd = file descriptor to poll 839 * events = events to poll on the FD 840 * flags = poll operation flags 841 */ 842 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry, 843 int fd, PollEvents events, PollFlags flags = PollFlags.NONE) @safe 844 { 845 import std.system : endian, Endian; 846 847 entry.prepRW(Operation.POLL_ADD, fd, null, flags); 848 static if (endian == Endian.bigEndian) 849 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 850 else 851 entry.poll_events32 = events; 852 return entry; 853 } 854 855 /** 856 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 857 * `0`. If not found, res will contain `-ENOENT`. 858 * 859 * Params: 860 * entry = `SubmissionEntry` to prepare 861 * userData = data with the previously issued poll operation 862 */ 863 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 864 { 865 return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData); 866 } 867 868 /** 869 * Allow events and user_data update of running poll requests. 870 * 871 * Note: available from Linux 5.13 872 */ 873 ref SubmissionEntry prepPollUpdate(U, V)(return ref SubmissionEntry entry, 874 ref U oldUserData, ref V newUserData, PollEvents events = PollEvents.NONE) @trusted 875 { 876 import std.system : endian, Endian; 877 878 PollFlags flags; 879 if (events != PollEvents.NONE) flags |= PollFlags.UPDATE_EVENTS; 880 if (cast(void*)&oldUserData !is cast(void*)&newUserData) flags |= PollFlags.UPDATE_USER_DATA; 881 882 entry.prepRW( 883 Operation.POLL_REMOVE, 884 -1, 885 cast(void*)&oldUserData, 886 flags, 887 cast(ulong)cast(void*)&newUserData 888 ); 889 static if (endian == Endian.bigEndian) 890 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 891 else 892 entry.poll_events32 = events; 893 return entry; 894 } 895 896 /** 897 * Prepares `sync_file_range(2)` operation. 898 * 899 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 900 * by the file descriptor fd with disk. 901 * 902 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 903 * 904 * Params: 905 * entry = `SubmissionEntry` to prepare 906 * fd = is the file descriptor to sync 907 * offset = the starting byte of the file range to be synchronized 908 * len = the length of the range to be synchronized, in bytes 909 * flags = the flags for the command. 910 * 911 * See_Also: `sync_file_range(2)` for the general description of the related system call. 912 * 913 * Note: available from Linux 5.2 914 */ 915 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len, 916 SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe 917 { 918 entry.prepRW(Operation.SYNC_FILE_RANGE, fd, null, len, offset); 919 entry.sync_range_flags = flags; 920 return entry; 921 } 922 923 /** 924 * This command will register a timeout operation. 925 * 926 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 927 * timeout condition is met when either the specified timeout expires, or the specified number of 928 * events have completed. Either condition will trigger the event. The request will complete with 929 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 930 * completed through requests completing on their own. If the timeout was cancelled before it 931 * expired, the request will complete with `-ECANCELED`. 932 * 933 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 934 * 935 * Params: 936 * entry = `SubmissionEntry` to prepare 937 * time = reference to `time64` data structure 938 * count = completion event count 939 * flags = define if it's a relative or absolute time 940 * 941 * Note: Available from Linux 5.4 942 */ 943 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, 944 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted 945 { 946 entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count); 947 entry.timeout_flags = flags; 948 return entry; 949 } 950 951 /** 952 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 953 * 954 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 955 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 956 * timeout request was found but expiration was already in progress, this request will terminate 957 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 958 * terminate with a result value of `-ENOENT`. 959 * 960 * Params: 961 * entry = `SubmissionEntry` to prepare 962 * userData = user data provided with the previously issued timeout operation 963 * 964 * Note: Available from Linux 5.5 965 */ 966 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 967 { 968 return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData); 969 } 970 971 /** 972 * Prepares operations to update existing timeout registered using `TIMEOUT`operation. 973 * 974 * Params: 975 * entry = `SubmissionEntry` to prepare 976 * userData = user data provided with the previously issued timeout operation 977 * time = reference to `time64` data structure with a new time spec 978 * flags = define if it's a relative or absolute time 979 * 980 * Note: Available from Linux 5.11 981 */ 982 ref SubmissionEntry prepTimeoutUpdate(D)(return ref SubmissionEntry entry, 983 ref KernelTimespec time, ref D userData, TimeoutFlags flags) @trusted 984 { 985 entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData, 0, cast(ulong)(cast(void*)&time)); 986 entry.timeout_flags = flags | TimeoutFlags.UPDATE; 987 return entry; 988 } 989 990 /** 991 * Prepares `accept4(2)` operation. 992 * 993 * See_Also: `accept4(2)`` for the general description of the related system call. 994 * 995 * Params: 996 * entry = `SubmissionEntry` to prepare 997 * fd = socket file descriptor 998 * addr = reference to one of sockaddr structires to be filled with accepted client address 999 * addrlen = reference to addrlen field that would be filled with accepted client address length 1000 * 1001 * Note: Available from Linux 5.5 1002 */ 1003 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 1004 AcceptFlags flags = AcceptFlags.NONE) @trusted 1005 { 1006 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 1007 entry.accept_flags = flags; 1008 return entry; 1009 } 1010 1011 /** 1012 * Prepares operation that cancels existing async work. 1013 * 1014 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 1015 * distinction to make here with the different kinds of commands. A read/write on a regular file 1016 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 1017 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 1018 * these operations if they haven’t yet been started. If they have been started, cancellations on 1019 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 1020 * at any time. The completion event for this request will have a result of 0 if done successfully, 1021 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 1022 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 1023 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 1024 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 1025 * possible. 1026 * 1027 * Params: 1028 * entry = `SubmissionEntry` to prepare 1029 * userData = `user_data` field of the request that should be cancelled 1030 * 1031 * Note: Available from Linux 5.5 1032 */ 1033 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted 1034 { 1035 entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData); 1036 entry.cancel_flags = flags; 1037 return entry; 1038 } 1039 1040 /** 1041 * Prepares linked timeout operation. 1042 * 1043 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 1044 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 1045 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 1046 * completion event count as it's tied to a specific request. If used, the timeout specified in the 1047 * command will cancel the linked command, unless the linked command completes before the 1048 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 1049 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 1050 * request. 1051 * 1052 * Note: Available from Linux 5.5 1053 * 1054 * Params: 1055 * entry = `SubmissionEntry` to prepare 1056 * time = time specification 1057 * flags = define if it's a relative or absolute time 1058 */ 1059 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted 1060 { 1061 entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0); 1062 entry.timeout_flags = flags; 1063 return entry; 1064 } 1065 1066 /** 1067 * Note: Available from Linux 5.5 1068 */ 1069 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted 1070 { 1071 return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof); 1072 } 1073 1074 /** 1075 * Note: Available from Linux 5.6 1076 */ 1077 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe 1078 { 1079 return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset); 1080 } 1081 1082 /** 1083 * Note: Available from Linux 5.6 1084 */ 1085 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted 1086 { 1087 return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset); 1088 } 1089 1090 /** 1091 * Note: Available from Linux 5.6 1092 */ 1093 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const char* path, int flags, uint mode) @trusted 1094 { 1095 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 1096 entry.open_flags = flags; 1097 return entry; 1098 } 1099 1100 /** 1101 * Note: Available from Linux 5.6 1102 */ 1103 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe 1104 { 1105 return entry.prepRW(Operation.CLOSE, fd); 1106 } 1107 1108 /** 1109 * Note: Available from Linux 5.6 1110 */ 1111 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe 1112 { 1113 return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 1114 } 1115 1116 /** 1117 * Note: Available from Linux 5.6 1118 */ 1119 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted 1120 { 1121 return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 1122 } 1123 1124 /** 1125 * Note: Available from Linux 5.6 1126 */ 1127 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const char* path, int flags, uint mask, ref Statx statxbuf) @trusted 1128 { 1129 entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf)); 1130 entry.statx_flags = flags; 1131 return entry; 1132 } 1133 1134 /** 1135 * Note: Available from Linux 5.6 1136 */ 1137 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe 1138 { 1139 entry.prepRW(Operation.FADVISE, fd, null, len, offset); 1140 entry.fadvise_advice = advice; 1141 return entry; 1142 } 1143 1144 /** 1145 * Note: Available from Linux 5.6 1146 */ 1147 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted 1148 { 1149 entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0); 1150 entry.fadvise_advice = advice; 1151 return entry; 1152 } 1153 1154 /** 1155 * Note: Available from Linux 5.6 1156 */ 1157 ref SubmissionEntry prepSend(return ref SubmissionEntry entry, 1158 int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1159 { 1160 entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1161 entry.msg_flags = flags; 1162 return entry; 1163 } 1164 1165 /** 1166 * Note: Available from Linux 5.6 1167 */ 1168 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1169 int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1170 { 1171 entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1172 entry.msg_flags = flags; 1173 return entry; 1174 } 1175 1176 /** 1177 * Variant that uses registered buffers group. 1178 * 1179 * Note: Available from Linux 5.6 1180 */ 1181 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1182 int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe 1183 { 1184 entry.prepRW(Operation.RECV, sockfd, null, len, 0); 1185 entry.msg_flags = flags; 1186 entry.buf_group = gid; 1187 entry.flags |= SubmissionEntryFlags.BUFFER_SELECT; 1188 return entry; 1189 } 1190 1191 /** 1192 * Note: Available from Linux 5.6 1193 */ 1194 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted 1195 { 1196 return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 1197 } 1198 1199 /** 1200 * Note: Available from Linux 5.6 1201 */ 1202 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted 1203 { 1204 return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd); 1205 } 1206 1207 /** 1208 * Note: Available from Linux 5.7 1209 * 1210 * This splice operation can be used to implement sendfile by splicing to an intermediate pipe 1211 * first, then splice to the final destination. In fact, the implementation of sendfile in kernel 1212 * uses splice internally. 1213 * 1214 * NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with 1215 * EINVAL if one of the fd doesn't explicitly support splice operation, e.g. reading from terminal 1216 * is unsupported from kernel 5.7 to 5.11. Check issue #291 for more information. 1217 * 1218 * Either fd_in or fd_out must be a pipe. 1219 * 1220 * Params 1221 * fd_in = input file descriptor 1222 * off_in = If fd_in refers to a pipe, off_in must be -1. 1223 * If fd_in does not refer to a pipe and off_in is -1, then bytes are read from 1224 * fd_in starting from the file offset and it is adjust appropriately; 1225 * If fd_in does not refer to a pipe and off_in is not -1, then the starting 1226 * offset of fd_in will be off_in. 1227 * fd_out = output filedescriptor 1228 * off_out = The description of off_in also applied to off_out. 1229 * len = Up to len bytes would be transfered between file descriptors. 1230 * splice_flags = see man splice(2) for description of flags. 1231 */ 1232 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry, 1233 int fd_in, ulong off_in, 1234 int fd_out, ulong off_out, 1235 uint len, uint splice_flags) @safe 1236 { 1237 entry.prepRW(Operation.SPLICE, fd_out, null, len, off_out); 1238 entry.splice_off_in = off_in; 1239 entry.splice_fd_in = fd_in; 1240 entry.splice_flags = splice_flags; 1241 return entry; 1242 } 1243 1244 /** 1245 * Note: Available from Linux 5.7 1246 * 1247 * Params: 1248 * entry = `SubmissionEntry` to prepare 1249 * buf = buffers to provide 1250 * len = length of each buffer to add 1251 * bgid = buffers group id 1252 * bid = starting buffer id 1253 */ 1254 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe 1255 { 1256 assert(buf.length < int.max, "Too many buffers"); 1257 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid); 1258 entry.buf_group = bgid; 1259 return entry; 1260 } 1261 1262 /// ditto 1263 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe 1264 { 1265 static assert(N < int.max, "Too many buffers"); 1266 static assert(M < uint.max, "Buffer too large"); 1267 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid); 1268 entry.buf_group = bgid; 1269 return entry; 1270 } 1271 1272 /// ditto 1273 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe 1274 { 1275 static assert(N < uint.max, "Buffer too large"); 1276 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid); 1277 entry.buf_group = bgid; 1278 return entry; 1279 } 1280 1281 /// ditto 1282 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe 1283 { 1284 assert(buf.length < uint.max, "Buffer too large"); 1285 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid); 1286 entry.buf_group = bgid; 1287 return entry; 1288 } 1289 1290 /** 1291 * Note: Available from Linux 5.7 1292 */ 1293 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe 1294 { 1295 entry.prepRW(Operation.REMOVE_BUFFERS, nr); 1296 entry.buf_group = bgid; 1297 return entry; 1298 } 1299 1300 /** 1301 * Note: Available from Linux 5.8 1302 */ 1303 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe 1304 { 1305 entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0); 1306 entry.splice_off_in = 0; 1307 entry.splice_fd_in = fd_in; 1308 entry.splice_flags = flags; 1309 return entry; 1310 } 1311 1312 /** 1313 * Note: Available from Linux 5.11 1314 */ 1315 ref SubmissionEntry prepShutdown(return ref SubmissionEntry entry, int fd, int how) @safe 1316 { 1317 return entry.prepRW(Operation.SHUTDOWN, fd, null, how, 0); 1318 } 1319 1320 /** 1321 * Note: Available from Linux 5.11 1322 */ 1323 ref SubmissionEntry prepRenameAt(return ref SubmissionEntry entry, 1324 int olddfd, const char* oldpath, int newfd, const char* newpath, int flags) @trusted 1325 { 1326 entry.prepRW(Operation.RENAMEAT, olddfd, cast(void*)oldpath, newfd, cast(ulong)cast(void*)newpath); 1327 entry.rename_flags = flags; 1328 return entry; 1329 } 1330 1331 /** 1332 * Note: Available from Linux 5.11 1333 */ 1334 ref SubmissionEntry prepUnlinkAt(return ref SubmissionEntry entry, int dirfd, const char* path, int flags) @trusted 1335 { 1336 entry.prepRW(Operation.UNLINKAT, dirfd, cast(void*)path, 0, 0); 1337 entry.unlink_flags = flags; 1338 return entry; 1339 } 1340 1341 private: 1342 1343 // uring cleanup 1344 void dispose(ref Uring uring) @trusted 1345 { 1346 if (uring.payload is null) return; 1347 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 1348 if (--uring.payload.refs == 0) 1349 { 1350 import std.traits : hasElaborateDestructor; 1351 // debug printf("uring(%d): free\n", uring.payload.fd); 1352 static if (hasElaborateDestructor!UringDesc) 1353 destroy(*uring.payload); // call possible destructors 1354 free(cast(void*)uring.payload); 1355 } 1356 uring.payload = null; 1357 } 1358 1359 // system fields descriptor 1360 struct UringDesc 1361 { 1362 nothrow @nogc: 1363 1364 int fd; 1365 size_t refs; 1366 SetupParameters params; 1367 SubmissionQueue sq; 1368 CompletionQueue cq; 1369 1370 iovec[] regBuffers; 1371 1372 ~this() @trusted 1373 { 1374 if (regBuffers) free(cast(void*)®Buffers[0]); 1375 if (sq.ring) munmap(sq.ring, sq.ringSize); 1376 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 1377 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 1378 close(fd); 1379 } 1380 1381 private auto mapRings() @trusted 1382 { 1383 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 1384 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 1385 1386 if (params.features & SetupFeatures.SINGLE_MMAP) 1387 { 1388 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 1389 cq.ringSize = sq.ringSize; 1390 } 1391 1392 sq.ring = mmap(null, sq.ringSize, 1393 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1394 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 1395 ); 1396 1397 if (sq.ring == MAP_FAILED) 1398 { 1399 sq.ring = null; 1400 return -errno; 1401 } 1402 1403 if (params.features & SetupFeatures.SINGLE_MMAP) 1404 cq.ring = sq.ring; 1405 else 1406 { 1407 cq.ring = mmap(null, cq.ringSize, 1408 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1409 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 1410 ); 1411 1412 if (cq.ring == MAP_FAILED) 1413 { 1414 cq.ring = null; 1415 return -errno; // cleanup is done in struct destructors 1416 } 1417 } 1418 1419 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 1420 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 1421 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 1422 sq.localTail = *sq.ktail; 1423 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 1424 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 1425 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 1426 1427 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 1428 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 1429 // That way, head and tail are actually indexes to our sqes array. 1430 foreach (i; 0..entries) 1431 { 1432 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 1433 } 1434 1435 auto psqes = mmap( 1436 null, entries * SubmissionEntry.sizeof, 1437 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1438 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 1439 ); 1440 1441 if (psqes == MAP_FAILED) return -errno; 1442 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 1443 1444 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 1445 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 1446 cq.localHead = *cq.khead; 1447 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 1448 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 1449 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 1450 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 1451 cq.kflags = cast(uint*)(cq.ring + params.cq_off.flags); 1452 return 0; 1453 } 1454 } 1455 1456 /// Wraper for `SubmissionEntry` queue 1457 struct SubmissionQueue 1458 { 1459 nothrow @nogc: 1460 1461 // mmaped fields 1462 uint* khead; // controlled by kernel 1463 uint* ktail; // controlled by us 1464 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1465 uint* kdropped; // counter of invalid submissions (out of bound index) 1466 uint ringMask; // constant mask used to determine array index from head/tail 1467 1468 // mmap details (for cleanup) 1469 void* ring; // pointer to the mmaped region 1470 size_t ringSize; // size of mmaped memory block 1471 1472 // mmapped list of entries (fixed length) 1473 SubmissionEntry[] sqes; 1474 1475 uint localTail; // used for batch submission 1476 1477 uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); } 1478 uint tail() const @safe pure { return localTail; } 1479 1480 void flushTail() @safe pure 1481 { 1482 pragma(inline, true); 1483 // debug printf("SQ updating tail: %d\n", localTail); 1484 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1485 } 1486 1487 SubmissionQueueFlags flags() const @safe pure 1488 { 1489 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1490 } 1491 1492 bool full() const @safe pure { return sqes.length == length; } 1493 1494 size_t length() const @safe pure { return tail - head; } 1495 1496 size_t capacity() const @safe pure { return sqes.length - length; } 1497 1498 ref SubmissionEntry next()() @safe pure return 1499 { 1500 return sqes[localTail++ & ringMask]; 1501 } 1502 1503 void put()(auto ref SubmissionEntry entry) @safe pure 1504 { 1505 assert(!full, "SumbissionQueue is full"); 1506 sqes[localTail++ & ringMask] = entry; 1507 } 1508 1509 void put(OP)(auto ref OP op) 1510 if (!is(OP == SubmissionEntry)) 1511 { 1512 assert(!full, "SumbissionQueue is full"); 1513 sqes[localTail++ & ringMask].fill(op); 1514 } 1515 1516 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1517 { 1518 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1519 1520 static assert( 1521 Parameters!FN.length >= 1 1522 && is(Parameters!FN[0] == SubmissionEntry) 1523 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1524 "Alias function must accept at least `ref SubmissionEntry`"); 1525 1526 static assert( 1527 is(typeof(FN(sqes[localTail & ringMask], args))), 1528 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1529 1530 assert(!full, "SumbissionQueue is full"); 1531 FN(sqes[localTail++ & ringMask], args); 1532 } 1533 1534 uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1535 } 1536 1537 struct CompletionQueue 1538 { 1539 nothrow @nogc: 1540 1541 // mmaped fields 1542 uint* khead; // controlled by us (increment after entry at head was read) 1543 uint* ktail; // updated by kernel 1544 uint* koverflow; 1545 uint* kflags; 1546 CompletionEntry[] cqes; // array of entries (fixed length) 1547 1548 uint ringMask; // constant mask used to determine array index from head/tail 1549 1550 // mmap details (for cleanup) 1551 void* ring; 1552 size_t ringSize; 1553 1554 uint localHead; // used for bulk reading 1555 1556 uint head() const @safe pure { return localHead; } 1557 uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1558 1559 void flushHead() @safe pure 1560 { 1561 pragma(inline, true); 1562 // debug printf("CQ updating head: %d\n", localHead); 1563 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1564 } 1565 1566 bool empty() const @safe pure { return head == tail; } 1567 1568 ref CompletionEntry front() @safe pure return 1569 { 1570 assert(!empty, "CompletionQueue is empty"); 1571 return cqes[localHead & ringMask]; 1572 } 1573 1574 void popFront() @safe pure 1575 { 1576 pragma(inline); 1577 assert(!empty, "CompletionQueue is empty"); 1578 localHead++; 1579 flushHead(); 1580 } 1581 1582 size_t length() const @safe pure { return tail - localHead; } 1583 1584 uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1585 1586 /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel. 1587 void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); } 1588 } 1589 1590 // just a helper to use atomicStore more easily with older compilers 1591 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1592 { 1593 pragma(inline, true); 1594 import core.atomic : store = atomicStore; 1595 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1596 else store!ms(*(cast(shared T*)&val), newVal); 1597 } 1598 1599 // just a helper to use atomicLoad more easily with older compilers 1600 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1601 { 1602 pragma(inline, true); 1603 import core.atomic : load = atomicLoad; 1604 static if (__VERSION__ >= 2089) return load!ms(val); 1605 else return load!ms(*(cast(const shared T*)&val)); 1606 } 1607 1608 version (assert) 1609 { 1610 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1611 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1612 static assert(isOutputRange!(Uring, SubmissionEntry)); 1613 } 1614 1615 version (LDC) 1616 { 1617 import ldc.intrinsics : llvm_expect; 1618 alias _expect = llvm_expect; 1619 } 1620 else 1621 { 1622 T _expect(T)(T val, T expected_val) if (__traits(isIntegral, T)) 1623 { 1624 pragma(inline, true); 1625 return val; 1626 } 1627 }