1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version(linux) {} 8 else { 9 pragma(msg, "!!!!!!! during/io_uring is available ONLY on Linux systems (5.1+). This package is useless on your system. !!!!!!!!!"); 10 static assert(0, "during is not available on your system"); 11 } 12 13 public import during.io_uring; 14 import during.openat2; 15 16 import core.atomic : MemoryOrder; 17 debug import core.stdc.stdio; 18 import core.stdc.stdlib; 19 import core.sys.linux.epoll; 20 import core.sys.linux.errno; 21 import core.sys.linux.sched; 22 import core.sys.linux.sys.mman; 23 import core.sys.linux.unistd; 24 import core.sys.posix.signal; 25 import core.sys.posix.sys.socket; 26 import core.sys.posix.sys.types; 27 import core.sys.posix.sys.uio; 28 import std.algorithm.comparison : among; 29 30 nothrow @nogc: 31 32 /** 33 * Setup new instance of io_uring into provided `Uring` structure. 34 * 35 * Params: 36 * uring = `Uring` structure to be initialized (must not be already initialized) 37 * entries = Number of entries to initialize uring with 38 * flags = `SetupFlags` to use to initialize uring. 39 * 40 * Returns: On succes it returns 0, `-errno` otherwise. 41 */ 42 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) 43 { 44 assert(uring.payload is null, "Uring is already initialized"); 45 uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof); 46 if (uring.payload is null) return -errno; 47 48 uring.payload.params.flags = flags; 49 uring.payload.refs = 1; 50 auto r = io_uring_setup(entries, uring.payload.params); 51 if (r < 0) return -errno; 52 53 uring.payload.fd = r; 54 55 r = uring.payload.mapRings(); 56 if (r < 0) 57 { 58 dispose(uring); 59 return r; 60 } 61 62 // debug printf("uring(%d): setup\n", uring.payload.fd); 63 64 return 0; 65 } 66 67 /** 68 * Main entry point to work with io_uring. 69 * 70 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 71 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 72 * 73 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 74 * 75 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 76 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 77 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 78 * reused for multiple operations. 79 */ 80 struct Uring 81 { 82 nothrow @nogc: 83 84 private UringDesc* payload; 85 86 /// Copy constructor 87 this(ref return scope Uring rhs) @safe pure 88 { 89 assert(rhs.payload !is null, "rhs payload is null"); 90 // debug printf("uring(%d): copy\n", rhs.payload.fd); 91 this.payload = rhs.payload; 92 this.payload.refs++; 93 } 94 95 /// Destructor 96 ~this() @safe 97 { 98 dispose(this); 99 } 100 101 /// Native io_uring file descriptor 102 int fd() const @safe pure 103 in (payload !is null, "Uring hasn't been initialized yet") 104 { 105 return payload.fd; 106 } 107 108 /// io_uring parameters 109 SetupParameters params() const @safe pure return 110 in (payload !is null, "Uring hasn't been initialized yet") 111 { 112 return payload.params; 113 } 114 115 /// Check if there is some `CompletionEntry` to process. 116 bool empty() const @safe pure 117 in (payload !is null, "Uring hasn't been initialized yet") 118 { 119 return payload.cq.empty; 120 } 121 122 /// Check if there is space for another `SubmissionEntry` to submit. 123 bool full() const @safe pure 124 in (payload !is null, "Uring hasn't been initialized yet") 125 { 126 return payload.sq.full; 127 } 128 129 /// Available space in submission queue before it becomes full 130 size_t capacity() const @safe pure 131 in (payload !is null, "Uring hasn't been initialized yet") 132 { 133 return payload.sq.capacity; 134 } 135 136 /// Number of entries in completion queue 137 size_t length() const @safe pure 138 in (payload !is null, "Uring hasn't been initialized yet") 139 { 140 return payload.cq.length; 141 } 142 143 /// Get first `CompletionEntry` from cq ring 144 ref CompletionEntry front() @safe pure return 145 in (payload !is null, "Uring hasn't been initialized yet") 146 { 147 return payload.cq.front; 148 } 149 150 /// Move to next `CompletionEntry` 151 void popFront() @safe pure 152 in (payload !is null, "Uring hasn't been initialized yet") 153 { 154 return payload.cq.popFront; 155 } 156 157 /** 158 * Adds new entry to the `SubmissionQueue`. 159 * 160 * Note that this just adds entry to the queue and doesn't advance the tail 161 * marker kernel sees. For that `finishSq()` is needed to be called next. 162 * 163 * Also note that to actually enter new entries to kernel, 164 * it's needed to call `submit()`. 165 * 166 * Params: 167 * FN = Function to fill next entry in queue by `ref` (should be faster). 168 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 169 * Note that in this case queue entry is cleaned first before function is called. 170 * entry = Custom built `SubmissionEntry` to be posted as is. 171 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 172 * args = Optional arguments passed to the function 173 * 174 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 175 */ 176 ref Uring put()(auto ref SubmissionEntry entry) @safe pure return 177 in (payload !is null, "Uring hasn't been initialized yet") 178 { 179 payload.sq.put(entry); 180 return this; 181 } 182 183 /// ditto 184 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 185 in (payload !is null, "Uring hasn't been initialized yet") 186 { 187 import std.functional : forward; 188 payload.sq.putWith!FN(forward!args); 189 return this; 190 } 191 192 /** 193 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 194 * to next `SubmissionEntry` in queue. 195 * 196 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 197 * 198 * Params: 199 * op = Custom operation definition. 200 * Returns: 201 */ 202 ref Uring put(OP)(auto ref OP op) return 203 if (!is(OP == SubmissionEntry)) 204 in (payload !is null, "Uring hasn't been initialized yet") 205 { 206 payload.sq.put(op); 207 return this; 208 } 209 210 /** 211 * Advances the userspace submision queue and returns last `SubmissionEntry`. 212 */ 213 ref SubmissionEntry next()() @safe pure 214 in (payload !is null, "Uring hasn't been initialized yet") 215 { 216 return payload.sq.next(); 217 } 218 219 /** 220 * If completion queue is full, the new event maybe dropped. 221 * This value records number of dropped events. 222 */ 223 uint overflow() const @safe pure 224 in (payload !is null, "Uring hasn't been initialized yet") 225 { 226 return payload.cq.overflow; 227 } 228 229 /// Counter of invalid submissions (out-of-bound index in submission array) 230 uint dropped() const @safe pure 231 in (payload !is null, "Uring hasn't been initialized yet") 232 { 233 return payload.sq.dropped; 234 } 235 236 /** 237 * Submits qued `SubmissionEntry` to be processed by kernel. 238 * 239 * Params: 240 * want = number of `CompletionEntries` to wait for. 241 * If 0, this just submits queued entries and returns. 242 * If > 0, it blocks until at least wanted number of entries were completed. 243 * sig = See io_uring_enter(2) man page 244 * 245 * Returns: Number of submitted entries on success, `-errno` on error 246 */ 247 int submit(S)(uint want, const S* args) 248 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 249 in (payload !is null, "Uring hasn't been initialized yet") 250 { 251 if (_expect(want > 0, false)) return submitAndWait(want, args); 252 return submit(args); 253 } 254 255 /// ditto 256 int submit(uint want) @safe 257 { 258 pragma(inline, true) 259 if (_expect(want > 0, true)) return submitAndWait(want, cast(sigset_t*)null); 260 return submit(cast(sigset_t*)null); 261 } 262 263 /// ditto 264 int submit(S)(const S* args) @trusted 265 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 266 in (payload !is null, "Uring hasn't been initialized yet") 267 { 268 immutable len = cast(int)payload.sq.length; 269 if (_expect(len > 0, true)) // anything to submit? 270 { 271 payload.sq.flushTail(); // advance queue index 272 273 EnterFlags flags; 274 if (payload.params.flags & SetupFlags.SQPOLL) 275 { 276 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false)) 277 flags |= EnterFlags.SQ_WAKEUP; 278 else return len; // fast poll 279 } 280 immutable r = io_uring_enter(payload.fd, len, 0, flags, args); 281 if (_expect(r < 0, false)) return -errno; 282 return r; 283 } 284 return 0; 285 } 286 287 /// ditto 288 int submit() @safe 289 { 290 pragma(inline, true) 291 return submit(cast(sigset_t*)null); 292 } 293 294 /** 295 * Flushes submission queue index to the kernel. 296 * Doesn't call any syscall, it just advances the SQE queue for kernel. 297 * This can be used with `IORING_SETUP_SQPOLL` when kernel polls the submission queue. 298 */ 299 void flush() @safe 300 { 301 payload.sq.flushTail(); // advance queue index 302 } 303 304 /** 305 * Simmilar to `submit` but with this method we just wait for required number 306 * of `CompletionEntries`. 307 * 308 * Returns: `0` on success, `-errno` on error 309 */ 310 int wait(S)(uint want = 1, const S* args = null) @trusted 311 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 312 in (payload !is null, "Uring hasn't been initialized yet") 313 in (want > 0, "Invalid want value") 314 { 315 pragma(inline); 316 if (payload.cq.length >= want) return 0; // we don't need to syscall 317 immutable r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, args); 318 if (_expect(r < 0, false)) return -errno; 319 return 0; 320 } 321 322 /// ditto 323 int wait(uint want = 1) 324 { 325 pragma(inline, true) 326 return wait(want, cast(sigset_t*)null); 327 } 328 329 /** 330 * Special case of a `submit` that can be used when we know beforehead that we want to wait for 331 * some amount of CQEs. 332 */ 333 int submitAndWait(S)(uint want, const S* args) @trusted 334 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 335 in (payload !is null, "Uring hasn't been initialized yet") 336 in (want > 0, "Invalid want value") 337 { 338 immutable len = cast(int)payload.sq.length; 339 if (_expect(len > 0, true)) // anything to submit? 340 { 341 payload.sq.flushTail(); // advance queue index 342 343 EnterFlags flags = EnterFlags.GETEVENTS; 344 if (payload.params.flags & SetupFlags.SQPOLL) 345 { 346 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false)) 347 flags |= EnterFlags.SQ_WAKEUP; 348 } 349 immutable r = io_uring_enter(payload.fd, len, want, flags, args); 350 if (_expect(r < 0, false)) return -errno; 351 return r; 352 } 353 return wait(want); // just simple wait 354 } 355 356 /// ditto 357 int submitAndWait(uint want) @safe 358 { 359 pragma(inline, true) 360 return submitAndWait(want, cast(sigset_t*)null); 361 } 362 363 /** 364 * Register single buffer to be mapped into the kernel for faster buffered operations. 365 * 366 * To use the buffers, the application must specify the fixed variants for of operations, 367 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 368 * in entry extra data. 369 * 370 * An application can increase or decrease the size or number of registered buffers by first 371 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 372 * the new buffers. 373 * 374 * Params: 375 * buffer = Buffers to be registered 376 * 377 * Returns: On success, returns 0. On error, `-errno` is returned. 378 */ 379 int registerBuffers(T)(T buffers) 380 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 381 in (payload !is null, "Uring hasn't been initialized yet") 382 in (buffers.length, "Empty buffer") 383 { 384 if (payload.regBuffers !is null) 385 return -EBUSY; // buffers were already registered 386 387 static if (is(T == ubyte[])) 388 { 389 auto p = malloc(iovec.sizeof); 390 if (_expect(p is null, false)) return -errno; 391 payload.regBuffers = (cast(iovec*)p)[0..1]; 392 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 393 payload.regBuffers[0].iov_len = buffers.length; 394 } 395 else static if (is(T == ubyte[][])) 396 { 397 auto p = malloc(buffers.length * iovec.sizeof); 398 if (_expect(p is null, false)) return -errno; 399 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 400 401 foreach (i, b; buffers) 402 { 403 assert(b.length, "Empty buffer"); 404 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 405 payload.regBuffers[i].iov_len = b.length; 406 } 407 } 408 409 immutable r = io_uring_register( 410 payload.fd, 411 RegisterOpCode.REGISTER_BUFFERS, 412 cast(const(void)*)&payload.regBuffers[0], 1 413 ); 414 415 if (_expect(r < 0, false)) return -errno; 416 return 0; 417 } 418 419 /** 420 * Releases all previously registered buffers associated with the `io_uring` instance. 421 * 422 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 423 * 424 * Returns: On success, returns 0. On error, `-errno` is returned. 425 */ 426 int unregisterBuffers() @trusted 427 in (payload !is null, "Uring hasn't been initialized yet") 428 { 429 if (payload.regBuffers is null) 430 return -ENXIO; // no buffers were registered 431 432 free(cast(void*)&payload.regBuffers[0]); 433 payload.regBuffers = null; 434 435 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 436 if (_expect(r < 0, false)) return -errno; 437 return 0; 438 } 439 440 /** 441 * Register files for I/O. 442 * 443 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 444 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 445 * file descriptor array. 446 * 447 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 448 * need only unregister if it wishes to register a new set of fds. 449 * 450 * Use `-1` as a file descriptor to mark it as reserved in the array.* 451 * Params: fds = array of file descriptors to be registered 452 * 453 * Returns: On success, returns 0. On error, `-errno` is returned. 454 */ 455 int registerFiles(const(int)[] fds) 456 in (payload !is null, "Uring hasn't been initialized yet") 457 in (fds.length, "No file descriptors provided") 458 in (fds.length < uint.max, "Too many file descriptors") 459 { 460 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 461 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 462 if (_expect(r < 0, false)) return -errno; 463 return 0; 464 } 465 466 /* 467 * Register an update for an existing file set. The updates will start at 468 * `off` in the original array. 469 * 470 * Use `-1` as a file descriptor to mark it as reserved in the array. 471 * 472 * Params: 473 * off = offset to the original registered files to be updated 474 * files = array of file descriptors to update with 475 * 476 * Returns: number of files updated on success, -errno on failure. 477 */ 478 int registerFilesUpdate(uint off, const(int)[] fds) @trusted 479 in (payload !is null, "Uring hasn't been initialized yet") 480 in (fds.length, "No file descriptors provided to update") 481 in (fds.length < uint.max, "Too many file descriptors") 482 { 483 struct Update // represents io_uring_files_update (obsolete) or io_uring_rsrc_update 484 { 485 uint offset; 486 uint _resv; 487 ulong data; 488 } 489 490 static assert (Update.sizeof == 16); 491 492 Update u = { offset: off, data: cast(ulong)&fds[0] }; 493 immutable r = io_uring_register( 494 payload.fd, 495 RegisterOpCode.REGISTER_FILES_UPDATE, 496 &u, cast(uint)fds.length); 497 if (_expect(r < 0, false)) return -errno; 498 return 0; 499 } 500 501 /** 502 * All previously registered files associated with the `io_uring` instance will be unregistered. 503 * 504 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 505 * need only unregister if it wishes to register a new set of fds. 506 * 507 * Returns: On success, returns 0. On error, `-errno` is returned. 508 */ 509 int unregisterFiles() @trusted 510 in (payload !is null, "Uring hasn't been initialized yet") 511 { 512 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 513 if (_expect(r < 0, false)) return -errno; 514 return 0; 515 } 516 517 /** 518 * Registers event file descriptor that would be used as a notification mechanism on completion 519 * queue change. 520 * 521 * Params: eventFD = event filedescriptor to be notified about change 522 * 523 * Returns: On success, returns 0. On error, `-errno` is returned. 524 */ 525 int registerEventFD(int eventFD) @trusted 526 in (payload !is null, "Uring hasn't been initialized yet") 527 { 528 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 529 if (_expect(r < 0, false)) return -errno; 530 return 0; 531 } 532 533 /** 534 * Unregister previously registered notification event file descriptor. 535 * 536 * Returns: On success, returns 0. On error, `-errno` is returned. 537 */ 538 int unregisterEventFD() @trusted 539 in (payload !is null, "Uring hasn't been initialized yet") 540 { 541 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 542 if (_expect(r < 0, false)) return -errno; 543 return 0; 544 } 545 546 /** 547 * Generic means to register resources. 548 * 549 * Note: Available from Linux 5.13 550 */ 551 int registerRsrc(R)(RegisterOpCode type, const(R)[] data, const(ulong)[] tags) 552 in (payload !is null, "Uring hasn't been initialized yet") 553 in (data.length == tags.length, "Different array lengths") 554 in (data.length < uint.max, "Too many resources") 555 { 556 struct Register // represents io_uring_rsrc_register 557 { 558 uint nr; 559 uint _resv; 560 ulong _resv2; 561 ulong data; 562 ulong tags; 563 } 564 565 static assert (Register.sizeof == 32); 566 567 Register r = { 568 nr: cast(uint)data.length, 569 data: cast(ulong)&data[0], 570 tags: cast(ulong)&tags[0] 571 }; 572 immutable ret = io_uring_register( 573 payload.fd, 574 type, 575 &r, sizeof(r)); 576 if (_expect(ret < 0, false)) return -errno; 577 return 0; 578 } 579 580 /** 581 * Generic means to update registered resources. 582 * 583 * Note: Available from Linux 5.13 584 */ 585 int registerRsrcUpdate(R)(RegisterOpCode type, uint off, const(R)[] data, const(ulong)[] tags) @trusted 586 in (payload !is null, "Uring hasn't been initialized yet") 587 in (data.length == tags.length, "Different array lengths") 588 in (data.length < uint.max, "Too many file descriptors") 589 { 590 struct Update // represents io_uring_rsrc_update2 591 { 592 uint offset; 593 uint _resv; 594 ulong data; 595 ulong tags; 596 uint nr; 597 uint _resv2; 598 } 599 600 static assert (Update.sizeof == 32); 601 602 Update u = { 603 offset: off, 604 data: cast(ulong)&data[0], 605 tags: cast(ulong)&tags[0], 606 nr: cast(uint)data.length, 607 }; 608 immutable r = io_uring_register(payload.fd, type, &u, sizeof(u)); 609 if (_expect(r < 0, false)) return -errno; 610 return 0; 611 } 612 613 /** 614 * Register a feature whitelist. Attempting to call any operations which are not whitelisted 615 * will result in an error. 616 * 617 * Note: Can only be called once to prevent other code from bypassing the whitelist. 618 * 619 * Params: res = the struct containing the restriction info. 620 * 621 * Returns: On success, returns 0. On error, `-errno` is returned. 622 * 623 * Note: Available from Linux 5.10 624 */ 625 int registerRestrictions(scope ref io_uring_restriction res) @trusted 626 in (payload !is null, "Uring hasn't been initialized yet") 627 { 628 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_RESTRICTIONS, &res, 1); 629 if (_expect(r < 0, false)) return -errno; 630 return 0; 631 } 632 633 /** 634 * Enable the "rings" of this Uring if they were previously disabled with 635 * `IORING_SETUP_R_DISABLED`. 636 * 637 * Returns: On success, returns 0. On error, `-errno` is returned. 638 * 639 * Note: Available from Linux 5.10 640 */ 641 int enableRings() @trusted 642 in (payload !is null, "Uring hasn't been initialized yet") 643 { 644 immutable r = io_uring_register(payload.fd, RegisterOpCode.ENABLE_RINGS, null, 0); 645 if (_expect(r < 0, false)) return -errno; 646 return 0; 647 } 648 649 /** 650 * By default, async workers created by io_uring will inherit the CPU mask of its parent. This 651 * is usually all the CPUs in the system, unless the parent is being run with a limited set. If 652 * this isn't the desired outcome, the application may explicitly tell io_uring what CPUs the 653 * async workers may run on. 654 * 655 * Note: Available since 5.14. 656 */ 657 int registerIOWQAffinity(cpu_set_t[] cpus) @trusted 658 in (cpus.length) 659 { 660 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_AFF, cpus.ptr, cast(uint)cpus.length); 661 if (_expect(r < 0, false)) return -errno; 662 return 0; 663 } 664 665 /** 666 * Undoes a CPU mask previously set with `registerIOWQAffinity`. 667 * 668 * Note: Available since 5.14 669 */ 670 int unregisterIOWQAffinity() @trusted 671 { 672 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_IOWQ_AFF, null, 0); 673 if (_expect(r < 0, false)) return -errno; 674 return 0; 675 } 676 677 /** 678 * By default, io_uring limits the unbounded workers created to the maximum processor count set 679 * by `RLIMIT_NPROC` and the bounded workers is a function of the SQ ring size and the number of 680 * CPUs in the system. Sometimes this can be excessive (or too little, for bounded), and this 681 * command provides a way to change the count per ring (per NUMA node) instead. 682 * 683 * `val` must be set to an `uint` pointer to an array of two values, with the values in the 684 * array being set to the maximum count of workers per NUMA node. Index 0 holds the bounded 685 * worker count, and index 1 holds the unbounded worker count. On successful return, the passed 686 * in array will contain the previous maximum values for each type. If the count being passed in 687 * is 0, then this command returns the current maximum values and doesn't modify the current 688 * setting. 689 * 690 * Note: Available since 5.15 691 */ 692 int registerIOWQMaxWorkers(ref uint[2] workers) @trusted 693 { 694 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_MAX_WORKERS, &workers, 2); 695 if (_expect(r < 0, false)) return -errno; 696 return 0; 697 } 698 } 699 700 /** 701 * Uses custom operation definition to fill fields of `SubmissionEntry`. 702 * Can be used in cases, when builtin prep* functions aren't enough. 703 * 704 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 705 * 706 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 707 * So it might be needed to call `clear` first on the entry (depends on usage). 708 * 709 * Params: 710 * entry = entry to set parameters to 711 * op = operation to fill entry with (can be custom type) 712 */ 713 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op) 714 { 715 pragma(inline); 716 import std.traits : hasMember, FieldNameTuple; 717 718 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 719 foreach (m; FieldNameTuple!E) 720 { 721 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 722 __traits(getMember, entry, m) = __traits(getMember, op, m); 723 } 724 725 return entry; 726 } 727 728 /** 729 * Template function to help set `SubmissionEntry` `user_data` field. 730 * 731 * Params: 732 * entry = `SubmissionEntry` to prepare 733 * data = data to set to the `SubmissionEntry` 734 * 735 * Note: data are passed by ref and must live during whole operation. 736 */ 737 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted 738 { 739 pragma(inline); 740 entry.user_data = cast(ulong)(cast(void*)&data); 741 return entry; 742 } 743 744 /** 745 * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData` 746 * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to 747 * the data. 748 * 749 * Because of that, data must be of `ulong.sizeof`. 750 */ 751 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted 752 if (D.sizeof == ulong.sizeof) 753 { 754 pragma(inline); 755 entry.user_data = *(cast(ulong*)(cast(void*)&data)); 756 return entry; 757 } 758 759 /** 760 * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`). 761 */ 762 D userDataAs(D)(ref CompletionEntry entry) @trusted 763 if (D.sizeof == ulong.sizeof) 764 { 765 pragma(inline); 766 return *(cast(D*)(cast(void*)&entry.user_data)); 767 } 768 769 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op, 770 int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe 771 { 772 pragma(inline); 773 entry.opcode = op; 774 entry.fd = fd; 775 entry.off = offset; 776 entry.flags = SubmissionEntryFlags.NONE; 777 entry.ioprio = 0; 778 entry.addr = cast(ulong)addr; 779 entry.len = len; 780 entry.rw_flags = ReadWriteFlags.NONE; 781 entry.user_data = 0; 782 entry.buf_index = 0; 783 entry.personality = 0; 784 entry.file_index = 0; 785 entry.__pad2[0] = entry.__pad2[1] = 0; 786 return entry; 787 } 788 789 /** 790 * Prepares `nop` operation. 791 * 792 * Params: 793 * entry = `SubmissionEntry` to prepare 794 */ 795 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe 796 { 797 entry.prepRW(Operation.NOP); 798 return entry; 799 } 800 801 /** 802 * Prepares `readv` operation. 803 * 804 * Params: 805 * entry = `SubmissionEntry` to prepare 806 * fd = file descriptor of file we are operating on 807 * offset = offset 808 * buffer = iovec buffers to be used by the operation 809 */ 810 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 811 if (is(V == iovec[]) || is(V == iovec)) 812 { 813 static if (is(V == iovec[])) 814 { 815 assert(buffer.length, "Empty buffer"); 816 assert(buffer.length < uint.max, "Too many iovec buffers"); 817 return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 818 } 819 else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset); 820 } 821 822 /** 823 * Prepares `writev` operation. 824 * 825 * Params: 826 * entry = `SubmissionEntry` to prepare 827 * fd = file descriptor of file we are operating on 828 * offset = offset 829 * buffer = iovec buffers to be used by the operation 830 */ 831 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 832 if (is(V == iovec[]) || is(V == iovec)) 833 { 834 static if (is(V == iovec[])) 835 { 836 assert(buffer.length, "Empty buffer"); 837 assert(buffer.length < uint.max, "Too many iovec buffers"); 838 return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 839 } 840 else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset); 841 } 842 843 /** 844 * Prepares `read_fixed` operation. 845 * 846 * Params: 847 * entry = `SubmissionEntry` to prepare 848 * fd = file descriptor of file we are operating on 849 * offset = offset 850 * buffer = slice to preregistered buffer 851 * bufferIndex = index to the preregistered buffers array buffer belongs to 852 */ 853 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 854 { 855 assert(buffer.length, "Empty buffer"); 856 assert(buffer.length < uint.max, "Buffer too large"); 857 entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 858 entry.buf_index = bufferIndex; 859 return entry; 860 } 861 862 /** 863 * Prepares `write_fixed` operation. 864 * 865 * Params: 866 * entry = `SubmissionEntry` to prepare 867 * fd = file descriptor of file we are operating on 868 * offset = offset 869 * buffer = slice to preregistered buffer 870 * bufferIndex = index to the preregistered buffers array buffer belongs to 871 */ 872 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 873 { 874 assert(buffer.length, "Empty buffer"); 875 assert(buffer.length < uint.max, "Buffer too large"); 876 entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 877 entry.buf_index = bufferIndex; 878 return entry; 879 } 880 881 /** 882 * Prepares `recvmsg(2)` operation. 883 * 884 * Params: 885 * entry = `SubmissionEntry` to prepare 886 * fd = file descriptor of file we are operating on 887 * msg = message to operate with 888 * flags = `recvmsg` operation flags 889 * 890 * Note: Available from Linux 5.3 891 * 892 * See_Also: `recvmsg(2)` man page for details. 893 */ 894 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 895 { 896 entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0); 897 entry.msg_flags = flags; 898 return entry; 899 } 900 901 /** 902 * Prepares `sendmsg(2)` operation. 903 * 904 * Params: 905 * entry = `SubmissionEntry` to prepare 906 * fd = file descriptor of file we are operating on 907 * msg = message to operate with 908 * flags = `sendmsg` operation flags 909 * 910 * Note: Available from Linux 5.3 911 * 912 * See_Also: `sendmsg(2)` man page for details. 913 */ 914 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 915 { 916 entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0); 917 entry.msg_flags = flags; 918 return entry; 919 } 920 921 /** 922 * Prepares `fsync` operation. 923 * 924 * Params: 925 * entry = `SubmissionEntry` to prepare 926 * fd = file descriptor of a file to call `fsync` on 927 * flags = `fsync` operation flags 928 */ 929 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 930 { 931 entry.prepRW(Operation.FSYNC, fd); 932 entry.fsync_flags = flags; 933 return entry; 934 } 935 936 /** 937 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 938 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 939 * That is, once the poll operation is completed, it will have to be resubmitted. 940 * 941 * Params: 942 * entry = `SubmissionEntry` to prepare 943 * fd = file descriptor to poll 944 * events = events to poll on the FD 945 * flags = poll operation flags 946 */ 947 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry, 948 int fd, PollEvents events, PollFlags flags = PollFlags.NONE) @safe 949 { 950 import std.system : endian, Endian; 951 952 entry.prepRW(Operation.POLL_ADD, fd, null, flags); 953 static if (endian == Endian.bigEndian) 954 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 955 else 956 entry.poll_events32 = events; 957 return entry; 958 } 959 960 /** 961 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 962 * `0`. If not found, res will contain `-ENOENT`. 963 * 964 * Params: 965 * entry = `SubmissionEntry` to prepare 966 * userData = data with the previously issued poll operation 967 */ 968 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 969 { 970 return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData); 971 } 972 973 /** 974 * Allow events and user_data update of running poll requests. 975 * 976 * Note: available from Linux 5.13 977 */ 978 ref SubmissionEntry prepPollUpdate(U, V)(return ref SubmissionEntry entry, 979 ref U oldUserData, ref V newUserData, PollEvents events = PollEvents.NONE) @trusted 980 { 981 import std.system : endian, Endian; 982 983 PollFlags flags; 984 if (events != PollEvents.NONE) flags |= PollFlags.UPDATE_EVENTS; 985 if (cast(void*)&oldUserData !is cast(void*)&newUserData) flags |= PollFlags.UPDATE_USER_DATA; 986 987 entry.prepRW( 988 Operation.POLL_REMOVE, 989 -1, 990 cast(void*)&oldUserData, 991 flags, 992 cast(ulong)cast(void*)&newUserData 993 ); 994 static if (endian == Endian.bigEndian) 995 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 996 else 997 entry.poll_events32 = events; 998 return entry; 999 } 1000 1001 /** 1002 * Prepares `sync_file_range(2)` operation. 1003 * 1004 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 1005 * by the file descriptor fd with disk. 1006 * 1007 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 1008 * 1009 * Params: 1010 * entry = `SubmissionEntry` to prepare 1011 * fd = is the file descriptor to sync 1012 * offset = the starting byte of the file range to be synchronized 1013 * len = the length of the range to be synchronized, in bytes 1014 * flags = the flags for the command. 1015 * 1016 * See_Also: `sync_file_range(2)` for the general description of the related system call. 1017 * 1018 * Note: available from Linux 5.2 1019 */ 1020 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len, 1021 SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe 1022 { 1023 entry.prepRW(Operation.SYNC_FILE_RANGE, fd, null, len, offset); 1024 entry.sync_range_flags = flags; 1025 return entry; 1026 } 1027 1028 /** 1029 * This command will register a timeout operation. 1030 * 1031 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 1032 * timeout condition is met when either the specified timeout expires, or the specified number of 1033 * events have completed. Either condition will trigger the event. The request will complete with 1034 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 1035 * completed through requests completing on their own. If the timeout was cancelled before it 1036 * expired, the request will complete with `-ECANCELED`. 1037 * 1038 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 1039 * 1040 * Params: 1041 * entry = `SubmissionEntry` to prepare 1042 * time = reference to `time64` data structure 1043 * count = completion event count 1044 * flags = define if it's a relative or absolute time 1045 * 1046 * Note: Available from Linux 5.4 1047 */ 1048 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, 1049 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted 1050 { 1051 entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count); 1052 entry.timeout_flags = flags; 1053 return entry; 1054 } 1055 1056 /** 1057 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 1058 * 1059 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 1060 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 1061 * timeout request was found but expiration was already in progress, this request will terminate 1062 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 1063 * terminate with a result value of `-ENOENT`. 1064 * 1065 * Params: 1066 * entry = `SubmissionEntry` to prepare 1067 * userData = user data provided with the previously issued timeout operation 1068 * 1069 * Note: Available from Linux 5.5 1070 */ 1071 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 1072 { 1073 return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData); 1074 } 1075 1076 /** 1077 * Prepares operations to update existing timeout registered using `TIMEOUT`operation. 1078 * 1079 * Params: 1080 * entry = `SubmissionEntry` to prepare 1081 * userData = user data provided with the previously issued timeout operation 1082 * time = reference to `time64` data structure with a new time spec 1083 * flags = define if it's a relative or absolute time 1084 * 1085 * Note: Available from Linux 5.11 1086 */ 1087 ref SubmissionEntry prepTimeoutUpdate(D)(return ref SubmissionEntry entry, 1088 ref KernelTimespec time, ref D userData, TimeoutFlags flags) @trusted 1089 { 1090 entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData, 0, cast(ulong)(cast(void*)&time)); 1091 entry.timeout_flags = flags | TimeoutFlags.UPDATE; 1092 return entry; 1093 } 1094 1095 /** 1096 * Prepares `accept4(2)` operation. 1097 * 1098 * See_Also: `accept4(2)`` for the general description of the related system call. 1099 * 1100 * Params: 1101 * entry = `SubmissionEntry` to prepare 1102 * fd = socket file descriptor 1103 * addr = reference to one of sockaddr structires to be filled with accepted client address 1104 * addrlen = reference to addrlen field that would be filled with accepted client address length 1105 * 1106 * Note: Available from Linux 5.5 1107 */ 1108 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 1109 AcceptFlags flags = AcceptFlags.NONE) @trusted 1110 { 1111 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 1112 entry.accept_flags = flags; 1113 return entry; 1114 } 1115 1116 /** 1117 * Same as `prepAccept`, but fd is put directly into fixed file table on `fileIndex`. 1118 * Note: available from Linux 5.15 1119 */ 1120 ref SubmissionEntry prepAcceptDirect(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 1121 uint fileIndex, AcceptFlags flags = AcceptFlags.NONE) @trusted 1122 { 1123 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 1124 entry.accept_flags = flags; 1125 entry.file_index = fileIndex+1; 1126 return entry; 1127 } 1128 1129 /** 1130 * Prepares operation that cancels existing async work. 1131 * 1132 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 1133 * distinction to make here with the different kinds of commands. A read/write on a regular file 1134 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 1135 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 1136 * these operations if they haven’t yet been started. If they have been started, cancellations on 1137 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 1138 * at any time. The completion event for this request will have a result of 0 if done successfully, 1139 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 1140 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 1141 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 1142 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 1143 * possible. 1144 * 1145 * Params: 1146 * entry = `SubmissionEntry` to prepare 1147 * userData = `user_data` field of the request that should be cancelled 1148 * 1149 * Note: Available from Linux 5.5 1150 */ 1151 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted 1152 { 1153 entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData); 1154 entry.cancel_flags = flags; 1155 return entry; 1156 } 1157 1158 /** 1159 * Prepares linked timeout operation. 1160 * 1161 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 1162 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 1163 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 1164 * completion event count as it's tied to a specific request. If used, the timeout specified in the 1165 * command will cancel the linked command, unless the linked command completes before the 1166 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 1167 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 1168 * request. 1169 * 1170 * Note: Available from Linux 5.5 1171 * 1172 * Params: 1173 * entry = `SubmissionEntry` to prepare 1174 * time = time specification 1175 * flags = define if it's a relative or absolute time 1176 */ 1177 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted 1178 { 1179 entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0); 1180 entry.timeout_flags = flags; 1181 return entry; 1182 } 1183 1184 /** 1185 * Note: Available from Linux 5.5 1186 */ 1187 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted 1188 { 1189 return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof); 1190 } 1191 1192 /** 1193 * Note: Available from Linux 5.6 1194 */ 1195 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe 1196 { 1197 return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset); 1198 } 1199 1200 /** 1201 * Note: Available from Linux 5.6 1202 */ 1203 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted 1204 { 1205 return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset); 1206 } 1207 1208 /** 1209 * Note: Available from Linux 5.6 1210 */ 1211 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode) @trusted 1212 { 1213 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 1214 entry.open_flags = flags; 1215 return entry; 1216 } 1217 1218 /** 1219 * Same as `prepOpenat`, but fd is put directly into fixed file table on `fileIndex`. 1220 * Note: available from Linux 5.15 1221 */ 1222 ref SubmissionEntry prepOpenatDirect(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode, uint fileIndex) @trusted 1223 { 1224 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 1225 entry.open_flags = flags; 1226 entry.file_index = fileIndex+1; 1227 return entry; 1228 } 1229 1230 /** 1231 * Note: Available from Linux 5.6 1232 */ 1233 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe 1234 { 1235 return entry.prepRW(Operation.CLOSE, fd); 1236 } 1237 1238 /** 1239 * Same as `prepClose` but operation works directly with fd registered in fixed file table on index `fileIndex`. 1240 * Note: Available from Linux 5.15 1241 */ 1242 ref SubmissionEntry prepCloseDirect(return ref SubmissionEntry entry, int fd, uint fileIndex) @safe 1243 { 1244 entry.prepRW(Operation.CLOSE, fd); 1245 entry.file_index = fileIndex+1; 1246 return entry; 1247 } 1248 1249 /** 1250 * Note: Available from Linux 5.6 1251 */ 1252 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe 1253 { 1254 return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 1255 } 1256 1257 /** 1258 * Note: Available from Linux 5.6 1259 */ 1260 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted 1261 { 1262 return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 1263 } 1264 1265 /** 1266 * Note: Available from Linux 5.6 1267 */ 1268 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mask, ref Statx statxbuf) @trusted 1269 { 1270 entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf)); 1271 entry.statx_flags = flags; 1272 return entry; 1273 } 1274 1275 /** 1276 * Note: Available from Linux 5.6 1277 */ 1278 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe 1279 { 1280 entry.prepRW(Operation.FADVISE, fd, null, len, offset); 1281 entry.fadvise_advice = advice; 1282 return entry; 1283 } 1284 1285 /** 1286 * Note: Available from Linux 5.6 1287 */ 1288 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted 1289 { 1290 entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0); 1291 entry.fadvise_advice = advice; 1292 return entry; 1293 } 1294 1295 /** 1296 * Note: Available from Linux 5.6 1297 */ 1298 ref SubmissionEntry prepSend(return ref SubmissionEntry entry, 1299 int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1300 { 1301 entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1302 entry.msg_flags = flags; 1303 return entry; 1304 } 1305 1306 /** 1307 * Note: Available from Linux 5.6 1308 */ 1309 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1310 int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1311 { 1312 entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1313 entry.msg_flags = flags; 1314 return entry; 1315 } 1316 1317 /** 1318 * Variant that uses registered buffers group. 1319 * 1320 * Note: Available from Linux 5.6 1321 */ 1322 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1323 int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe 1324 { 1325 entry.prepRW(Operation.RECV, sockfd, null, len, 0); 1326 entry.msg_flags = flags; 1327 entry.buf_group = gid; 1328 entry.flags |= SubmissionEntryFlags.BUFFER_SELECT; 1329 return entry; 1330 } 1331 1332 /** 1333 * Note: Available from Linux 5.6 1334 */ 1335 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted 1336 { 1337 return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 1338 } 1339 1340 /** 1341 * Same as `prepOpenat2`, but fd is put directly into fixed file table on `fileIndex`. 1342 * Note: available from Linux 5.15 1343 */ 1344 ref SubmissionEntry prepOpenat2Direct(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how, uint fileIndex) @trusted 1345 { 1346 entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 1347 entry.file_index = fileIndex+1; 1348 return entry; 1349 } 1350 1351 /** 1352 * Note: Available from Linux 5.6 1353 */ 1354 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted 1355 { 1356 return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd); 1357 } 1358 1359 /** 1360 * Note: Available from Linux 5.7 1361 * 1362 * This splice operation can be used to implement sendfile by splicing to an intermediate pipe 1363 * first, then splice to the final destination. In fact, the implementation of sendfile in kernel 1364 * uses splice internally. 1365 * 1366 * NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with 1367 * EINVAL if one of the fd doesn't explicitly support splice operation, e.g. reading from terminal 1368 * is unsupported from kernel 5.7 to 5.11. Check issue #291 for more information. 1369 * 1370 * Either fd_in or fd_out must be a pipe. 1371 * 1372 * Params 1373 * fd_in = input file descriptor 1374 * off_in = If fd_in refers to a pipe, off_in must be -1. 1375 * If fd_in does not refer to a pipe and off_in is -1, then bytes are read from 1376 * fd_in starting from the file offset and it is adjust appropriately; 1377 * If fd_in does not refer to a pipe and off_in is not -1, then the starting 1378 * offset of fd_in will be off_in. 1379 * fd_out = output filedescriptor 1380 * off_out = The description of off_in also applied to off_out. 1381 * len = Up to len bytes would be transfered between file descriptors. 1382 * splice_flags = see man splice(2) for description of flags. 1383 */ 1384 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry, 1385 int fd_in, ulong off_in, 1386 int fd_out, ulong off_out, 1387 uint len, uint splice_flags) @safe 1388 { 1389 entry.prepRW(Operation.SPLICE, fd_out, null, len, off_out); 1390 entry.splice_off_in = off_in; 1391 entry.splice_fd_in = fd_in; 1392 entry.splice_flags = splice_flags; 1393 return entry; 1394 } 1395 1396 /** 1397 * Note: Available from Linux 5.7 1398 * 1399 * Params: 1400 * entry = `SubmissionEntry` to prepare 1401 * buf = buffers to provide 1402 * len = length of each buffer to add 1403 * bgid = buffers group id 1404 * bid = starting buffer id 1405 */ 1406 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe 1407 { 1408 assert(buf.length <= int.max, "Too many buffers"); 1409 assert(len <= uint.max, "Buffer too large"); 1410 version (assert) { 1411 foreach (b; buf) assert(b.length <= len, "Invalid buffer length"); 1412 } 1413 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid); 1414 entry.buf_group = bgid; 1415 return entry; 1416 } 1417 1418 /// ditto 1419 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe 1420 { 1421 static assert(N <= int.max, "Too many buffers"); 1422 static assert(M <= uint.max, "Buffer too large"); 1423 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid); 1424 entry.buf_group = bgid; 1425 return entry; 1426 } 1427 1428 /// ditto 1429 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe 1430 { 1431 static assert(N <= uint.max, "Buffer too large"); 1432 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid); 1433 entry.buf_group = bgid; 1434 return entry; 1435 } 1436 1437 /// ditto 1438 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe 1439 { 1440 assert(buf.length <= uint.max, "Buffer too large"); 1441 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid); 1442 entry.buf_group = bgid; 1443 return entry; 1444 } 1445 1446 /** 1447 * Note: Available from Linux 5.7 1448 */ 1449 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe 1450 { 1451 entry.prepRW(Operation.REMOVE_BUFFERS, nr); 1452 entry.buf_group = bgid; 1453 return entry; 1454 } 1455 1456 /** 1457 * Note: Available from Linux 5.8 1458 */ 1459 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe 1460 { 1461 entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0); 1462 entry.splice_off_in = 0; 1463 entry.splice_fd_in = fd_in; 1464 entry.splice_flags = flags; 1465 return entry; 1466 } 1467 1468 /** 1469 * Note: Available from Linux 5.11 1470 */ 1471 ref SubmissionEntry prepShutdown(return ref SubmissionEntry entry, int fd, int how) @safe 1472 { 1473 return entry.prepRW(Operation.SHUTDOWN, fd, null, how, 0); 1474 } 1475 1476 /** 1477 * Note: Available from Linux 5.11 1478 */ 1479 ref SubmissionEntry prepRenameat(return ref SubmissionEntry entry, 1480 int olddfd, const(char)* oldpath, int newfd, const(char)* newpath, int flags) @trusted 1481 { 1482 entry.prepRW(Operation.RENAMEAT, olddfd, cast(void*)oldpath, newfd, cast(ulong)cast(void*)newpath); 1483 entry.rename_flags = flags; 1484 return entry; 1485 } 1486 1487 /** 1488 * Note: Available from Linux 5.11 1489 */ 1490 ref SubmissionEntry prepUnlinkat(return ref SubmissionEntry entry, int dirfd, const(char)* path, int flags) @trusted 1491 { 1492 entry.prepRW(Operation.UNLINKAT, dirfd, cast(void*)path, 0, 0); 1493 entry.unlink_flags = flags; 1494 return entry; 1495 } 1496 1497 /** 1498 * Note: Available from Linux 5.15 1499 */ 1500 ref SubmissionEntry prepMkdirat(return ref SubmissionEntry entry, int dirfd, const(char)* path, mode_t mode) @trusted 1501 { 1502 entry.prepRW(Operation.MKDIRAT, dirfd, cast(void*)path, mode, 0); 1503 return entry; 1504 } 1505 1506 /** 1507 * Note: Available from Linux 5.15 1508 */ 1509 ref SubmissionEntry prepSymlinkat(return ref SubmissionEntry entry, const(char)* target, int newdirfd, const(char)* linkpath) @trusted 1510 { 1511 entry.prepRW(Operation.SYMLINKAT, newdirfd, cast(void*)target, 0, cast(ulong)cast(void*)linkpath); 1512 return entry; 1513 } 1514 1515 /** 1516 * Note: Available from Linux 5.15 1517 */ 1518 ref SubmissionEntry prepLinkat(return ref SubmissionEntry entry, 1519 int olddirfd, const(char)* oldpath, 1520 int newdirfd, const(char)* newpath, int flags) @trusted 1521 { 1522 entry.prepRW(Operation.LINKAT, olddirfd, cast(void*)oldpath, newdirfd, cast(ulong)cast(void*)newpath); 1523 entry.hardlink_flags = flags; 1524 return entry; 1525 } 1526 1527 private: 1528 1529 // uring cleanup 1530 void dispose(ref Uring uring) @trusted 1531 { 1532 if (uring.payload is null) return; 1533 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 1534 if (--uring.payload.refs == 0) 1535 { 1536 import std.traits : hasElaborateDestructor; 1537 // debug printf("uring(%d): free\n", uring.payload.fd); 1538 static if (hasElaborateDestructor!UringDesc) 1539 destroy(*uring.payload); // call possible destructors 1540 free(cast(void*)uring.payload); 1541 } 1542 uring.payload = null; 1543 } 1544 1545 // system fields descriptor 1546 struct UringDesc 1547 { 1548 nothrow @nogc: 1549 1550 int fd; 1551 size_t refs; 1552 SetupParameters params; 1553 SubmissionQueue sq; 1554 CompletionQueue cq; 1555 1556 iovec[] regBuffers; 1557 1558 ~this() @trusted 1559 { 1560 if (regBuffers) free(cast(void*)®Buffers[0]); 1561 if (sq.ring) munmap(sq.ring, sq.ringSize); 1562 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 1563 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 1564 close(fd); 1565 } 1566 1567 private auto mapRings() @trusted 1568 { 1569 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 1570 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 1571 1572 if (params.features & SetupFeatures.SINGLE_MMAP) 1573 { 1574 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 1575 cq.ringSize = sq.ringSize; 1576 } 1577 1578 sq.ring = mmap(null, sq.ringSize, 1579 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1580 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 1581 ); 1582 1583 if (sq.ring == MAP_FAILED) 1584 { 1585 sq.ring = null; 1586 return -errno; 1587 } 1588 1589 if (params.features & SetupFeatures.SINGLE_MMAP) 1590 cq.ring = sq.ring; 1591 else 1592 { 1593 cq.ring = mmap(null, cq.ringSize, 1594 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1595 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 1596 ); 1597 1598 if (cq.ring == MAP_FAILED) 1599 { 1600 cq.ring = null; 1601 return -errno; // cleanup is done in struct destructors 1602 } 1603 } 1604 1605 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 1606 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 1607 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 1608 sq.localTail = *sq.ktail; 1609 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 1610 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 1611 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 1612 1613 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 1614 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 1615 // That way, head and tail are actually indexes to our sqes array. 1616 foreach (i; 0..entries) 1617 { 1618 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 1619 } 1620 1621 auto psqes = mmap( 1622 null, entries * SubmissionEntry.sizeof, 1623 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1624 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 1625 ); 1626 1627 if (psqes == MAP_FAILED) return -errno; 1628 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 1629 1630 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 1631 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 1632 cq.localHead = *cq.khead; 1633 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 1634 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 1635 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 1636 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 1637 cq.kflags = cast(uint*)(cq.ring + params.cq_off.flags); 1638 return 0; 1639 } 1640 } 1641 1642 /// Wraper for `SubmissionEntry` queue 1643 struct SubmissionQueue 1644 { 1645 nothrow @nogc: 1646 1647 // mmaped fields 1648 uint* khead; // controlled by kernel 1649 uint* ktail; // controlled by us 1650 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1651 uint* kdropped; // counter of invalid submissions (out of bound index) 1652 uint ringMask; // constant mask used to determine array index from head/tail 1653 1654 // mmap details (for cleanup) 1655 void* ring; // pointer to the mmaped region 1656 size_t ringSize; // size of mmaped memory block 1657 1658 // mmapped list of entries (fixed length) 1659 SubmissionEntry[] sqes; 1660 1661 uint localTail; // used for batch submission 1662 1663 uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); } 1664 uint tail() const @safe pure { return localTail; } 1665 1666 void flushTail() @safe pure 1667 { 1668 pragma(inline, true); 1669 // debug printf("SQ updating tail: %d\n", localTail); 1670 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1671 } 1672 1673 SubmissionQueueFlags flags() const @safe pure 1674 { 1675 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1676 } 1677 1678 bool full() const @safe pure { return sqes.length == length; } 1679 1680 size_t length() const @safe pure { return tail - head; } 1681 1682 size_t capacity() const @safe pure { return sqes.length - length; } 1683 1684 ref SubmissionEntry next()() @safe pure return 1685 { 1686 assert(!full, "SumbissionQueue is full"); 1687 return sqes[localTail++ & ringMask]; 1688 } 1689 1690 void put()(auto ref SubmissionEntry entry) @safe pure 1691 { 1692 assert(!full, "SumbissionQueue is full"); 1693 sqes[localTail++ & ringMask] = entry; 1694 } 1695 1696 void put(OP)(auto ref OP op) 1697 if (!is(OP == SubmissionEntry)) 1698 { 1699 assert(!full, "SumbissionQueue is full"); 1700 sqes[localTail++ & ringMask].fill(op); 1701 } 1702 1703 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1704 { 1705 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1706 1707 static assert( 1708 Parameters!FN.length >= 1 1709 && is(Parameters!FN[0] == SubmissionEntry) 1710 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1711 "Alias function must accept at least `ref SubmissionEntry`"); 1712 1713 static assert( 1714 is(typeof(FN(sqes[localTail & ringMask], args))), 1715 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1716 1717 assert(!full, "SumbissionQueue is full"); 1718 FN(sqes[localTail++ & ringMask], args); 1719 } 1720 1721 uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1722 } 1723 1724 struct CompletionQueue 1725 { 1726 nothrow @nogc: 1727 1728 // mmaped fields 1729 uint* khead; // controlled by us (increment after entry at head was read) 1730 uint* ktail; // updated by kernel 1731 uint* koverflow; 1732 uint* kflags; 1733 CompletionEntry[] cqes; // array of entries (fixed length) 1734 1735 uint ringMask; // constant mask used to determine array index from head/tail 1736 1737 // mmap details (for cleanup) 1738 void* ring; 1739 size_t ringSize; 1740 1741 uint localHead; // used for bulk reading 1742 1743 uint head() const @safe pure { return localHead; } 1744 uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1745 1746 void flushHead() @safe pure 1747 { 1748 pragma(inline, true); 1749 // debug printf("CQ updating head: %d\n", localHead); 1750 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1751 } 1752 1753 bool empty() const @safe pure { return head == tail; } 1754 1755 ref CompletionEntry front() @safe pure return 1756 { 1757 assert(!empty, "CompletionQueue is empty"); 1758 return cqes[localHead & ringMask]; 1759 } 1760 1761 void popFront() @safe pure 1762 { 1763 pragma(inline); 1764 assert(!empty, "CompletionQueue is empty"); 1765 localHead++; 1766 flushHead(); 1767 } 1768 1769 size_t length() const @safe pure { return tail - localHead; } 1770 1771 uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1772 1773 /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel. 1774 void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); } 1775 } 1776 1777 // just a helper to use atomicStore more easily with older compilers 1778 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1779 { 1780 pragma(inline, true); 1781 import core.atomic : store = atomicStore; 1782 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1783 else store!ms(*(cast(shared T*)&val), newVal); 1784 } 1785 1786 // just a helper to use atomicLoad more easily with older compilers 1787 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1788 { 1789 pragma(inline, true); 1790 import core.atomic : load = atomicLoad; 1791 static if (__VERSION__ >= 2089) return load!ms(val); 1792 else return load!ms(*(cast(const shared T*)&val)); 1793 } 1794 1795 version (assert) 1796 { 1797 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1798 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1799 static assert(isOutputRange!(Uring, SubmissionEntry)); 1800 } 1801 1802 version (LDC) 1803 { 1804 import ldc.intrinsics : llvm_expect; 1805 alias _expect = llvm_expect; 1806 } 1807 else 1808 { 1809 T _expect(T)(T val, T expected_val) if (__traits(isIntegral, T)) 1810 { 1811 pragma(inline, true); 1812 return val; 1813 } 1814 }