1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version(linux): 8 9 public import during.io_uring; 10 import during.openat2; 11 12 import core.atomic : MemoryOrder; 13 debug import core.stdc.stdio; 14 import core.stdc.stdlib; 15 import core.sys.linux.epoll; 16 import core.sys.linux.errno; 17 import core.sys.linux.sched; 18 import core.sys.linux.sys.mman; 19 import core.sys.linux.unistd; 20 import core.sys.posix.signal; 21 import core.sys.posix.sys.socket; 22 import core.sys.posix.sys.types; 23 import core.sys.posix.sys.uio; 24 import std.algorithm.comparison : among; 25 26 nothrow @nogc: 27 28 /** 29 * Setup new instance of io_uring into provided `Uring` structure. 30 * 31 * Params: 32 * uring = `Uring` structure to be initialized (must not be already initialized) 33 * entries = Number of entries to initialize uring with 34 * flags = `SetupFlags` to use to initialize uring. 35 * 36 * Returns: On succes it returns 0, `-errno` otherwise. 37 */ 38 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) @safe 39 { 40 assert(uring.payload is null, "Uring is already initialized"); 41 uring.payload = () @trusted { return cast(UringDesc*)calloc(1, UringDesc.sizeof); }(); 42 if (uring.payload is null) return -errno; 43 44 uring.payload.params.flags = flags; 45 uring.payload.refs = 1; 46 auto r = io_uring_setup(entries, uring.payload.params); 47 if (r < 0) return -errno; 48 49 uring.payload.fd = r; 50 51 r = uring.payload.mapRings(); 52 if (r < 0) 53 { 54 dispose(uring); 55 return r; 56 } 57 58 // debug printf("uring(%d): setup\n", uring.payload.fd); 59 60 return 0; 61 } 62 63 /** 64 * Simplified wrapper around `io_uring_probe` that is used to check what io_uring operations current 65 * kernel is actually supporting. 66 */ 67 struct Probe 68 { 69 static assert (Operation.max < 64, "Needs to be adjusted"); 70 private 71 { 72 io_uring_probe probe; 73 io_uring_probe_op[64] ops; 74 int err; 75 } 76 77 const @safe pure nothrow @nogc: 78 79 /// Is operation supported? 80 bool isSupported(Operation op) 81 in (op <= Operation.max, "Invalid operation") 82 { 83 if (op > probe.last_op) return false; 84 assert(ops[op].op == op, "Operations differs"); 85 return (ops[op].flags & IO_URING_OP_SUPPORTED) != 0; 86 } 87 88 /// Error code when we fail to get `Probe`. 89 @property int error() { return err; } 90 91 /// `true` if probe was sucesfully retrieved. 92 T opCast(T)() if (is(T == bool)) { return err == 0; } 93 } 94 95 /// Probes supported operations on a temporary created uring instance 96 Probe probe() @safe nothrow @nogc 97 { 98 Uring io; 99 immutable ret = io.setup(2); 100 if (ret < 0) { 101 Probe res; 102 res.err = ret; 103 return res; 104 } 105 106 return io.probe(); 107 } 108 109 /** 110 * Main entry point to work with io_uring. 111 * 112 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 113 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 114 * 115 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 116 * 117 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 118 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 119 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 120 * reused for multiple operations. 121 */ 122 struct Uring 123 { 124 nothrow @nogc: 125 126 private UringDesc* payload; 127 128 /// Copy constructor 129 this(ref return scope Uring rhs) @safe pure 130 { 131 assert(rhs.payload !is null, "rhs payload is null"); 132 // debug printf("uring(%d): copy\n", rhs.payload.fd); 133 this.payload = rhs.payload; 134 this.payload.refs++; 135 } 136 137 /// Destructor 138 ~this() @safe 139 { 140 dispose(this); 141 } 142 143 /// Probes supported operations 144 Probe probe() @safe 145 in (payload !is null, "Uring hasn't been initialized yet") 146 { 147 Probe res; 148 immutable ret = () @trusted { return io_uring_register( 149 payload.fd, 150 RegisterOpCode.REGISTER_PROBE, 151 cast(void*)&res.probe, res.ops.length 152 ); }(); 153 if (ret < 0) res.err = ret; 154 return res; 155 } 156 157 /// Native io_uring file descriptor 158 int fd() const @safe pure 159 in (payload !is null, "Uring hasn't been initialized yet") 160 { 161 return payload.fd; 162 } 163 164 /// io_uring parameters 165 SetupParameters params() const @safe pure return 166 in (payload !is null, "Uring hasn't been initialized yet") 167 { 168 return payload.params; 169 } 170 171 /// Check if there is some `CompletionEntry` to process. 172 bool empty() const @safe pure 173 in (payload !is null, "Uring hasn't been initialized yet") 174 { 175 return payload.cq.empty; 176 } 177 178 /// Check if there is space for another `SubmissionEntry` to submit. 179 bool full() const @safe pure 180 in (payload !is null, "Uring hasn't been initialized yet") 181 { 182 return payload.sq.full; 183 } 184 185 /// Available space in submission queue before it becomes full 186 size_t capacity() const @safe pure 187 in (payload !is null, "Uring hasn't been initialized yet") 188 { 189 return payload.sq.capacity; 190 } 191 192 /// Number of entries in completion queue 193 size_t length() const @safe pure 194 in (payload !is null, "Uring hasn't been initialized yet") 195 { 196 return payload.cq.length; 197 } 198 199 /// Get first `CompletionEntry` from cq ring 200 ref CompletionEntry front() @safe pure return 201 in (payload !is null, "Uring hasn't been initialized yet") 202 { 203 return payload.cq.front; 204 } 205 206 /// Move to next `CompletionEntry` 207 void popFront() @safe pure 208 in (payload !is null, "Uring hasn't been initialized yet") 209 { 210 return payload.cq.popFront; 211 } 212 213 /** 214 * Adds new entry to the `SubmissionQueue`. 215 * 216 * Note that this just adds entry to the queue and doesn't advance the tail 217 * marker kernel sees. For that `finishSq()` is needed to be called next. 218 * 219 * Also note that to actually enter new entries to kernel, 220 * it's needed to call `submit()`. 221 * 222 * Params: 223 * FN = Function to fill next entry in queue by `ref` (should be faster). 224 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 225 * Note that in this case queue entry is cleaned first before function is called. 226 * entry = Custom built `SubmissionEntry` to be posted as is. 227 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 228 * args = Optional arguments passed to the function 229 * 230 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 231 */ 232 ref Uring put()(auto ref SubmissionEntry entry) @safe pure return 233 in (payload !is null, "Uring hasn't been initialized yet") 234 { 235 payload.sq.put(entry); 236 return this; 237 } 238 239 /// ditto 240 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 241 in (payload !is null, "Uring hasn't been initialized yet") 242 { 243 import std.functional : forward; 244 payload.sq.putWith!FN(forward!args); 245 return this; 246 } 247 248 /** 249 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 250 * to next `SubmissionEntry` in queue. 251 * 252 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 253 * 254 * Params: 255 * op = Custom operation definition. 256 * Returns: 257 */ 258 ref Uring put(OP)(auto ref OP op) return 259 if (!is(OP == SubmissionEntry)) 260 in (payload !is null, "Uring hasn't been initialized yet") 261 { 262 payload.sq.put(op); 263 return this; 264 } 265 266 /** 267 * Advances the userspace submision queue and returns last `SubmissionEntry`. 268 */ 269 ref SubmissionEntry next()() @safe pure 270 in (payload !is null, "Uring hasn't been initialized yet") 271 { 272 return payload.sq.next(); 273 } 274 275 /** 276 * If completion queue is full, the new event maybe dropped. 277 * This value records number of dropped events. 278 */ 279 uint overflow() const @safe pure 280 in (payload !is null, "Uring hasn't been initialized yet") 281 { 282 return payload.cq.overflow; 283 } 284 285 /// Counter of invalid submissions (out-of-bound index in submission array) 286 uint dropped() const @safe pure 287 in (payload !is null, "Uring hasn't been initialized yet") 288 { 289 return payload.sq.dropped; 290 } 291 292 /** 293 * Submits qued `SubmissionEntry` to be processed by kernel. 294 * 295 * Params: 296 * want = number of `CompletionEntries` to wait for. 297 * If 0, this just submits queued entries and returns. 298 * If > 0, it blocks until at least wanted number of entries were completed. 299 * sig = See io_uring_enter(2) man page 300 * 301 * Returns: Number of submitted entries on success, `-errno` on error 302 */ 303 int submit(S)(uint want, const S* args) 304 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 305 in (payload !is null, "Uring hasn't been initialized yet") 306 { 307 if (_expect(want > 0, false)) return submitAndWait(want, args); 308 return submit(args); 309 } 310 311 /// ditto 312 int submit(uint want) @safe 313 { 314 pragma(inline, true) 315 if (_expect(want > 0, true)) return submitAndWait(want, cast(sigset_t*)null); 316 return submit(cast(sigset_t*)null); 317 } 318 319 /// ditto 320 int submit(S)(const S* args) @trusted 321 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 322 in (payload !is null, "Uring hasn't been initialized yet") 323 { 324 immutable len = cast(int)payload.sq.length; 325 if (_expect(len > 0, true)) // anything to submit? 326 { 327 payload.sq.flushTail(); // advance queue index 328 329 EnterFlags flags; 330 if (payload.params.flags & SetupFlags.SQPOLL) 331 { 332 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false)) 333 flags |= EnterFlags.SQ_WAKEUP; 334 else return len; // fast poll 335 } 336 immutable r = io_uring_enter(payload.fd, len, 0, flags, args); 337 if (_expect(r < 0, false)) return -errno; 338 return r; 339 } 340 return 0; 341 } 342 343 /// ditto 344 int submit() @safe 345 { 346 pragma(inline, true) 347 return submit(cast(sigset_t*)null); 348 } 349 350 /** 351 * Flushes submission queue index to the kernel. 352 * Doesn't call any syscall, it just advances the SQE queue for kernel. 353 * This can be used with `IORING_SETUP_SQPOLL` when kernel polls the submission queue. 354 */ 355 void flush() @safe 356 { 357 payload.sq.flushTail(); // advance queue index 358 } 359 360 /** 361 * Simmilar to `submit` but with this method we just wait for required number 362 * of `CompletionEntries`. 363 * 364 * Returns: `0` on success, `-errno` on error 365 */ 366 int wait(S)(uint want = 1, const S* args = null) @trusted 367 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 368 in (payload !is null, "Uring hasn't been initialized yet") 369 in (want > 0, "Invalid want value") 370 { 371 pragma(inline); 372 if (payload.cq.length >= want) return 0; // we don't need to syscall 373 immutable r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, args); 374 if (_expect(r < 0, false)) return -errno; 375 return 0; 376 } 377 378 /// ditto 379 int wait(uint want = 1) 380 { 381 pragma(inline, true) 382 return wait(want, cast(sigset_t*)null); 383 } 384 385 /** 386 * Special case of a `submit` that can be used when we know beforehead that we want to wait for 387 * some amount of CQEs. 388 */ 389 int submitAndWait(S)(uint want, const S* args) @trusted 390 if (is(S == sigset_t) || is(S == io_uring_getevents_arg)) 391 in (payload !is null, "Uring hasn't been initialized yet") 392 in (want > 0, "Invalid want value") 393 { 394 immutable len = cast(int)payload.sq.length; 395 if (_expect(len > 0, true)) // anything to submit? 396 { 397 payload.sq.flushTail(); // advance queue index 398 399 EnterFlags flags = EnterFlags.GETEVENTS; 400 if (payload.params.flags & SetupFlags.SQPOLL) 401 { 402 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false)) 403 flags |= EnterFlags.SQ_WAKEUP; 404 } 405 immutable r = io_uring_enter(payload.fd, len, want, flags, args); 406 if (_expect(r < 0, false)) return -errno; 407 return r; 408 } 409 return wait(want); // just simple wait 410 } 411 412 /// ditto 413 int submitAndWait(uint want) @safe 414 { 415 pragma(inline, true) 416 return submitAndWait(want, cast(sigset_t*)null); 417 } 418 419 /** 420 * Register single buffer to be mapped into the kernel for faster buffered operations. 421 * 422 * To use the buffers, the application must specify the fixed variants for of operations, 423 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 424 * in entry extra data. 425 * 426 * An application can increase or decrease the size or number of registered buffers by first 427 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 428 * the new buffers. 429 * 430 * Params: 431 * buffer = Buffers to be registered 432 * 433 * Returns: On success, returns 0. On error, `-errno` is returned. 434 */ 435 int registerBuffers(T)(T buffers) 436 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 437 in (payload !is null, "Uring hasn't been initialized yet") 438 in (buffers.length, "Empty buffer") 439 { 440 if (payload.regBuffers !is null) 441 return -EBUSY; // buffers were already registered 442 443 static if (is(T == ubyte[])) 444 { 445 auto p = malloc(iovec.sizeof); 446 if (_expect(p is null, false)) return -errno; 447 payload.regBuffers = (cast(iovec*)p)[0..1]; 448 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 449 payload.regBuffers[0].iov_len = buffers.length; 450 } 451 else static if (is(T == ubyte[][])) 452 { 453 auto p = malloc(buffers.length * iovec.sizeof); 454 if (_expect(p is null, false)) return -errno; 455 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 456 457 foreach (i, b; buffers) 458 { 459 assert(b.length, "Empty buffer"); 460 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 461 payload.regBuffers[i].iov_len = b.length; 462 } 463 } 464 465 immutable r = io_uring_register( 466 payload.fd, 467 RegisterOpCode.REGISTER_BUFFERS, 468 cast(const(void)*)payload.regBuffers.ptr, 1 469 ); 470 471 if (_expect(r < 0, false)) return -errno; 472 return 0; 473 } 474 475 /** 476 * Releases all previously registered buffers associated with the `io_uring` instance. 477 * 478 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 479 * 480 * Returns: On success, returns 0. On error, `-errno` is returned. 481 */ 482 int unregisterBuffers() @trusted 483 in (payload !is null, "Uring hasn't been initialized yet") 484 { 485 if (payload.regBuffers is null) 486 return -ENXIO; // no buffers were registered 487 488 free(cast(void*)&payload.regBuffers[0]); 489 payload.regBuffers = null; 490 491 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 492 if (_expect(r < 0, false)) return -errno; 493 return 0; 494 } 495 496 /** 497 * Register files for I/O. 498 * 499 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 500 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 501 * file descriptor array. 502 * 503 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 504 * need only unregister if it wishes to register a new set of fds. 505 * 506 * Use `-1` as a file descriptor to mark it as reserved in the array.* 507 * Params: fds = array of file descriptors to be registered 508 * 509 * Returns: On success, returns 0. On error, `-errno` is returned. 510 */ 511 int registerFiles(const(int)[] fds) 512 in (payload !is null, "Uring hasn't been initialized yet") 513 in (fds.length, "No file descriptors provided") 514 in (fds.length < uint.max, "Too many file descriptors") 515 { 516 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 517 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 518 if (_expect(r < 0, false)) return -errno; 519 return 0; 520 } 521 522 /* 523 * Register an update for an existing file set. The updates will start at 524 * `off` in the original array. 525 * 526 * Use `-1` as a file descriptor to mark it as reserved in the array. 527 * 528 * Params: 529 * off = offset to the original registered files to be updated 530 * files = array of file descriptors to update with 531 * 532 * Returns: number of files updated on success, -errno on failure. 533 */ 534 int registerFilesUpdate(uint off, const(int)[] fds) @trusted 535 in (payload !is null, "Uring hasn't been initialized yet") 536 in (fds.length, "No file descriptors provided to update") 537 in (fds.length < uint.max, "Too many file descriptors") 538 { 539 struct Update // represents io_uring_files_update (obsolete) or io_uring_rsrc_update 540 { 541 uint offset; 542 uint _resv; 543 ulong data; 544 } 545 546 static assert (Update.sizeof == 16); 547 548 Update u = { offset: off, data: cast(ulong)&fds[0] }; 549 immutable r = io_uring_register( 550 payload.fd, 551 RegisterOpCode.REGISTER_FILES_UPDATE, 552 &u, cast(uint)fds.length); 553 if (_expect(r < 0, false)) return -errno; 554 return 0; 555 } 556 557 /** 558 * All previously registered files associated with the `io_uring` instance will be unregistered. 559 * 560 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 561 * need only unregister if it wishes to register a new set of fds. 562 * 563 * Returns: On success, returns 0. On error, `-errno` is returned. 564 */ 565 int unregisterFiles() @trusted 566 in (payload !is null, "Uring hasn't been initialized yet") 567 { 568 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 569 if (_expect(r < 0, false)) return -errno; 570 return 0; 571 } 572 573 /** 574 * Registers event file descriptor that would be used as a notification mechanism on completion 575 * queue change. 576 * 577 * Params: eventFD = event filedescriptor to be notified about change 578 * 579 * Returns: On success, returns 0. On error, `-errno` is returned. 580 */ 581 int registerEventFD(int eventFD) @trusted 582 in (payload !is null, "Uring hasn't been initialized yet") 583 { 584 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 585 if (_expect(r < 0, false)) return -errno; 586 return 0; 587 } 588 589 /** 590 * Unregister previously registered notification event file descriptor. 591 * 592 * Returns: On success, returns 0. On error, `-errno` is returned. 593 */ 594 int unregisterEventFD() @trusted 595 in (payload !is null, "Uring hasn't been initialized yet") 596 { 597 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 598 if (_expect(r < 0, false)) return -errno; 599 return 0; 600 } 601 602 /** 603 * Generic means to register resources. 604 * 605 * Note: Available from Linux 5.13 606 */ 607 int registerRsrc(R)(RegisterOpCode type, const(R)[] data, const(ulong)[] tags) 608 in (payload !is null, "Uring hasn't been initialized yet") 609 in (data.length == tags.length, "Different array lengths") 610 in (data.length < uint.max, "Too many resources") 611 { 612 struct Register // represents io_uring_rsrc_register 613 { 614 uint nr; 615 uint _resv; 616 ulong _resv2; 617 ulong data; 618 ulong tags; 619 } 620 621 static assert (Register.sizeof == 32); 622 623 Register r = { 624 nr: cast(uint)data.length, 625 data: cast(ulong)&data[0], 626 tags: cast(ulong)&tags[0] 627 }; 628 immutable ret = io_uring_register( 629 payload.fd, 630 type, 631 &r, sizeof(r)); 632 if (_expect(ret < 0, false)) return -errno; 633 return 0; 634 } 635 636 /** 637 * Generic means to update registered resources. 638 * 639 * Note: Available from Linux 5.13 640 */ 641 int registerRsrcUpdate(R)(RegisterOpCode type, uint off, const(R)[] data, const(ulong)[] tags) @trusted 642 in (payload !is null, "Uring hasn't been initialized yet") 643 in (data.length == tags.length, "Different array lengths") 644 in (data.length < uint.max, "Too many file descriptors") 645 { 646 struct Update // represents io_uring_rsrc_update2 647 { 648 uint offset; 649 uint _resv; 650 ulong data; 651 ulong tags; 652 uint nr; 653 uint _resv2; 654 } 655 656 static assert (Update.sizeof == 32); 657 658 Update u = { 659 offset: off, 660 data: cast(ulong)&data[0], 661 tags: cast(ulong)&tags[0], 662 nr: cast(uint)data.length, 663 }; 664 immutable r = io_uring_register(payload.fd, type, &u, sizeof(u)); 665 if (_expect(r < 0, false)) return -errno; 666 return 0; 667 } 668 669 /** 670 * Register a feature whitelist. Attempting to call any operations which are not whitelisted 671 * will result in an error. 672 * 673 * Note: Can only be called once to prevent other code from bypassing the whitelist. 674 * 675 * Params: res = the struct containing the restriction info. 676 * 677 * Returns: On success, returns 0. On error, `-errno` is returned. 678 * 679 * Note: Available from Linux 5.10 680 */ 681 int registerRestrictions(scope ref io_uring_restriction res) @trusted 682 in (payload !is null, "Uring hasn't been initialized yet") 683 { 684 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_RESTRICTIONS, &res, 1); 685 if (_expect(r < 0, false)) return -errno; 686 return 0; 687 } 688 689 /** 690 * Enable the "rings" of this Uring if they were previously disabled with 691 * `IORING_SETUP_R_DISABLED`. 692 * 693 * Returns: On success, returns 0. On error, `-errno` is returned. 694 * 695 * Note: Available from Linux 5.10 696 */ 697 int enableRings() @trusted 698 in (payload !is null, "Uring hasn't been initialized yet") 699 { 700 immutable r = io_uring_register(payload.fd, RegisterOpCode.ENABLE_RINGS, null, 0); 701 if (_expect(r < 0, false)) return -errno; 702 return 0; 703 } 704 705 /** 706 * By default, async workers created by io_uring will inherit the CPU mask of its parent. This 707 * is usually all the CPUs in the system, unless the parent is being run with a limited set. If 708 * this isn't the desired outcome, the application may explicitly tell io_uring what CPUs the 709 * async workers may run on. 710 * 711 * Note: Available since 5.14. 712 */ 713 int registerIOWQAffinity(cpu_set_t[] cpus) @trusted 714 in (cpus.length) 715 { 716 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_AFF, cpus.ptr, cast(uint)cpus.length); 717 if (_expect(r < 0, false)) return -errno; 718 return 0; 719 } 720 721 /** 722 * Undoes a CPU mask previously set with `registerIOWQAffinity`. 723 * 724 * Note: Available since 5.14 725 */ 726 int unregisterIOWQAffinity() @trusted 727 { 728 immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_IOWQ_AFF, null, 0); 729 if (_expect(r < 0, false)) return -errno; 730 return 0; 731 } 732 733 /** 734 * By default, io_uring limits the unbounded workers created to the maximum processor count set 735 * by `RLIMIT_NPROC` and the bounded workers is a function of the SQ ring size and the number of 736 * CPUs in the system. Sometimes this can be excessive (or too little, for bounded), and this 737 * command provides a way to change the count per ring (per NUMA node) instead. 738 * 739 * `val` must be set to an `uint` pointer to an array of two values, with the values in the 740 * array being set to the maximum count of workers per NUMA node. Index 0 holds the bounded 741 * worker count, and index 1 holds the unbounded worker count. On successful return, the passed 742 * in array will contain the previous maximum values for each type. If the count being passed in 743 * is 0, then this command returns the current maximum values and doesn't modify the current 744 * setting. 745 * 746 * Note: Available since 5.15 747 */ 748 int registerIOWQMaxWorkers(ref uint[2] workers) @trusted 749 { 750 immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_MAX_WORKERS, &workers, 2); 751 if (_expect(r < 0, false)) return -errno; 752 return 0; 753 } 754 } 755 756 /** 757 * Uses custom operation definition to fill fields of `SubmissionEntry`. 758 * Can be used in cases, when builtin prep* functions aren't enough. 759 * 760 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 761 * 762 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 763 * So it might be needed to call `clear` first on the entry (depends on usage). 764 * 765 * Params: 766 * entry = entry to set parameters to 767 * op = operation to fill entry with (can be custom type) 768 */ 769 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op) 770 { 771 pragma(inline); 772 import std.traits : hasMember, FieldNameTuple; 773 774 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 775 foreach (m; FieldNameTuple!E) 776 { 777 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 778 __traits(getMember, entry, m) = __traits(getMember, op, m); 779 } 780 781 return entry; 782 } 783 784 /** 785 * Template function to help set `SubmissionEntry` `user_data` field. 786 * 787 * Params: 788 * entry = `SubmissionEntry` to prepare 789 * data = data to set to the `SubmissionEntry` 790 * 791 * Note: data are passed by ref and must live during whole operation. 792 */ 793 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted 794 { 795 pragma(inline); 796 entry.user_data = cast(ulong)(cast(void*)&data); 797 return entry; 798 } 799 800 /** 801 * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData` 802 * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to 803 * the data. 804 * 805 * Because of that, data must be of `ulong.sizeof`. 806 */ 807 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted 808 if (D.sizeof == ulong.sizeof) 809 { 810 pragma(inline); 811 entry.user_data = *(cast(ulong*)(cast(void*)&data)); 812 return entry; 813 } 814 815 /** 816 * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`). 817 */ 818 D userDataAs(D)(ref CompletionEntry entry) @trusted 819 if (D.sizeof == ulong.sizeof) 820 { 821 pragma(inline); 822 return *(cast(D*)(cast(void*)&entry.user_data)); 823 } 824 825 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op, 826 int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe 827 { 828 pragma(inline); 829 entry.opcode = op; 830 entry.fd = fd; 831 entry.off = offset; 832 entry.flags = SubmissionEntryFlags.NONE; 833 entry.ioprio = 0; 834 entry.addr = cast(ulong)addr; 835 entry.len = len; 836 entry.rw_flags = ReadWriteFlags.NONE; 837 entry.user_data = 0; 838 entry.buf_index = 0; 839 entry.personality = 0; 840 entry.file_index = 0; 841 entry.__pad2[0] = entry.__pad2[1] = 0; 842 return entry; 843 } 844 845 /** 846 * Prepares `nop` operation. 847 * 848 * Params: 849 * entry = `SubmissionEntry` to prepare 850 */ 851 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe 852 { 853 entry.prepRW(Operation.NOP); 854 return entry; 855 } 856 857 /** 858 * Prepares `readv` operation. 859 * 860 * Params: 861 * entry = `SubmissionEntry` to prepare 862 * fd = file descriptor of file we are operating on 863 * offset = offset 864 * buffer = iovec buffers to be used by the operation 865 */ 866 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 867 if (is(V == iovec[]) || is(V == iovec)) 868 { 869 static if (is(V == iovec[])) 870 { 871 assert(buffer.length, "Empty buffer"); 872 assert(buffer.length < uint.max, "Too many iovec buffers"); 873 return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 874 } 875 else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset); 876 } 877 878 /** 879 * Prepares `writev` operation. 880 * 881 * Params: 882 * entry = `SubmissionEntry` to prepare 883 * fd = file descriptor of file we are operating on 884 * offset = offset 885 * buffer = iovec buffers to be used by the operation 886 */ 887 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 888 if (is(V == iovec[]) || is(V == iovec)) 889 { 890 static if (is(V == iovec[])) 891 { 892 assert(buffer.length, "Empty buffer"); 893 assert(buffer.length < uint.max, "Too many iovec buffers"); 894 return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 895 } 896 else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset); 897 } 898 899 /** 900 * Prepares `read_fixed` operation. 901 * 902 * Params: 903 * entry = `SubmissionEntry` to prepare 904 * fd = file descriptor of file we are operating on 905 * offset = offset 906 * buffer = slice to preregistered buffer 907 * bufferIndex = index to the preregistered buffers array buffer belongs to 908 */ 909 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 910 { 911 assert(buffer.length, "Empty buffer"); 912 assert(buffer.length < uint.max, "Buffer too large"); 913 entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 914 entry.buf_index = bufferIndex; 915 return entry; 916 } 917 918 /** 919 * Prepares `write_fixed` operation. 920 * 921 * Params: 922 * entry = `SubmissionEntry` to prepare 923 * fd = file descriptor of file we are operating on 924 * offset = offset 925 * buffer = slice to preregistered buffer 926 * bufferIndex = index to the preregistered buffers array buffer belongs to 927 */ 928 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 929 { 930 assert(buffer.length, "Empty buffer"); 931 assert(buffer.length < uint.max, "Buffer too large"); 932 entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 933 entry.buf_index = bufferIndex; 934 return entry; 935 } 936 937 /** 938 * Prepares `recvmsg(2)` operation. 939 * 940 * Params: 941 * entry = `SubmissionEntry` to prepare 942 * fd = file descriptor of file we are operating on 943 * msg = message to operate with 944 * flags = `recvmsg` operation flags 945 * 946 * Note: Available from Linux 5.3 947 * 948 * See_Also: `recvmsg(2)` man page for details. 949 */ 950 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 951 { 952 entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0); 953 entry.msg_flags = flags; 954 return entry; 955 } 956 957 /** 958 * Prepares `sendmsg(2)` operation. 959 * 960 * Params: 961 * entry = `SubmissionEntry` to prepare 962 * fd = file descriptor of file we are operating on 963 * msg = message to operate with 964 * flags = `sendmsg` operation flags 965 * 966 * Note: Available from Linux 5.3 967 * 968 * See_Also: `sendmsg(2)` man page for details. 969 */ 970 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 971 { 972 entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0); 973 entry.msg_flags = flags; 974 return entry; 975 } 976 977 /** 978 * Prepares `fsync` operation. 979 * 980 * Params: 981 * entry = `SubmissionEntry` to prepare 982 * fd = file descriptor of a file to call `fsync` on 983 * flags = `fsync` operation flags 984 */ 985 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 986 { 987 entry.prepRW(Operation.FSYNC, fd); 988 entry.fsync_flags = flags; 989 return entry; 990 } 991 992 /** 993 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 994 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 995 * That is, once the poll operation is completed, it will have to be resubmitted. 996 * 997 * Params: 998 * entry = `SubmissionEntry` to prepare 999 * fd = file descriptor to poll 1000 * events = events to poll on the FD 1001 * flags = poll operation flags 1002 */ 1003 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry, 1004 int fd, PollEvents events, PollFlags flags = PollFlags.NONE) @safe 1005 { 1006 import std.system : endian, Endian; 1007 1008 entry.prepRW(Operation.POLL_ADD, fd, null, flags); 1009 static if (endian == Endian.bigEndian) 1010 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 1011 else 1012 entry.poll_events32 = events; 1013 return entry; 1014 } 1015 1016 /** 1017 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 1018 * `0`. If not found, res will contain `-ENOENT`. 1019 * 1020 * Params: 1021 * entry = `SubmissionEntry` to prepare 1022 * userData = data with the previously issued poll operation 1023 */ 1024 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 1025 { 1026 return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData); 1027 } 1028 1029 /** 1030 * Allow events and user_data update of running poll requests. 1031 * 1032 * Note: available from Linux 5.13 1033 */ 1034 ref SubmissionEntry prepPollUpdate(U, V)(return ref SubmissionEntry entry, 1035 ref U oldUserData, ref V newUserData, PollEvents events = PollEvents.NONE) @trusted 1036 { 1037 import std.system : endian, Endian; 1038 1039 PollFlags flags; 1040 if (events != PollEvents.NONE) flags |= PollFlags.UPDATE_EVENTS; 1041 if (cast(void*)&oldUserData !is cast(void*)&newUserData) flags |= PollFlags.UPDATE_USER_DATA; 1042 1043 entry.prepRW( 1044 Operation.POLL_REMOVE, 1045 -1, 1046 cast(void*)&oldUserData, 1047 flags, 1048 cast(ulong)cast(void*)&newUserData 1049 ); 1050 static if (endian == Endian.bigEndian) 1051 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 1052 else 1053 entry.poll_events32 = events; 1054 return entry; 1055 } 1056 1057 /** 1058 * Prepares `sync_file_range(2)` operation. 1059 * 1060 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 1061 * by the file descriptor fd with disk. 1062 * 1063 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 1064 * 1065 * Params: 1066 * entry = `SubmissionEntry` to prepare 1067 * fd = is the file descriptor to sync 1068 * offset = the starting byte of the file range to be synchronized 1069 * len = the length of the range to be synchronized, in bytes 1070 * flags = the flags for the command. 1071 * 1072 * See_Also: `sync_file_range(2)` for the general description of the related system call. 1073 * 1074 * Note: available from Linux 5.2 1075 */ 1076 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len, 1077 SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe 1078 { 1079 entry.prepRW(Operation.SYNC_FILE_RANGE, fd, null, len, offset); 1080 entry.sync_range_flags = flags; 1081 return entry; 1082 } 1083 1084 /** 1085 * This command will register a timeout operation. 1086 * 1087 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 1088 * timeout condition is met when either the specified timeout expires, or the specified number of 1089 * events have completed. Either condition will trigger the event. The request will complete with 1090 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 1091 * completed through requests completing on their own. If the timeout was cancelled before it 1092 * expired, the request will complete with `-ECANCELED`. 1093 * 1094 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 1095 * 1096 * Params: 1097 * entry = `SubmissionEntry` to prepare 1098 * time = reference to `time64` data structure 1099 * count = completion event count 1100 * flags = define if it's a relative or absolute time 1101 * 1102 * Note: Available from Linux 5.4 1103 */ 1104 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, 1105 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted 1106 { 1107 entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count); 1108 entry.timeout_flags = flags; 1109 return entry; 1110 } 1111 1112 /** 1113 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 1114 * 1115 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 1116 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 1117 * timeout request was found but expiration was already in progress, this request will terminate 1118 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 1119 * terminate with a result value of `-ENOENT`. 1120 * 1121 * Params: 1122 * entry = `SubmissionEntry` to prepare 1123 * userData = user data provided with the previously issued timeout operation 1124 * 1125 * Note: Available from Linux 5.5 1126 */ 1127 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 1128 { 1129 return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData); 1130 } 1131 1132 /** 1133 * Prepares operations to update existing timeout registered using `TIMEOUT`operation. 1134 * 1135 * Params: 1136 * entry = `SubmissionEntry` to prepare 1137 * userData = user data provided with the previously issued timeout operation 1138 * time = reference to `time64` data structure with a new time spec 1139 * flags = define if it's a relative or absolute time 1140 * 1141 * Note: Available from Linux 5.11 1142 */ 1143 ref SubmissionEntry prepTimeoutUpdate(D)(return ref SubmissionEntry entry, 1144 ref KernelTimespec time, ref D userData, TimeoutFlags flags) @trusted 1145 { 1146 entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData, 0, cast(ulong)(cast(void*)&time)); 1147 entry.timeout_flags = flags | TimeoutFlags.UPDATE; 1148 return entry; 1149 } 1150 1151 /** 1152 * Prepares `accept4(2)` operation. 1153 * 1154 * See_Also: `accept4(2)`` for the general description of the related system call. 1155 * 1156 * Params: 1157 * entry = `SubmissionEntry` to prepare 1158 * fd = socket file descriptor 1159 * addr = reference to one of sockaddr structires to be filled with accepted client address 1160 * addrlen = reference to addrlen field that would be filled with accepted client address length 1161 * 1162 * Note: Available from Linux 5.5 1163 */ 1164 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 1165 AcceptFlags flags = AcceptFlags.NONE) @trusted 1166 { 1167 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 1168 entry.accept_flags = flags; 1169 return entry; 1170 } 1171 1172 /** 1173 * Same as `prepAccept`, but fd is put directly into fixed file table on `fileIndex`. 1174 * Note: available from Linux 5.15 1175 */ 1176 ref SubmissionEntry prepAcceptDirect(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 1177 uint fileIndex, AcceptFlags flags = AcceptFlags.NONE) @trusted 1178 { 1179 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 1180 entry.accept_flags = flags; 1181 entry.file_index = fileIndex+1; 1182 return entry; 1183 } 1184 1185 /** 1186 * Prepares operation that cancels existing async work. 1187 * 1188 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 1189 * distinction to make here with the different kinds of commands. A read/write on a regular file 1190 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 1191 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 1192 * these operations if they haven’t yet been started. If they have been started, cancellations on 1193 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 1194 * at any time. The completion event for this request will have a result of 0 if done successfully, 1195 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 1196 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 1197 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 1198 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 1199 * possible. 1200 * 1201 * Params: 1202 * entry = `SubmissionEntry` to prepare 1203 * userData = `user_data` field of the request that should be cancelled 1204 * 1205 * Note: Available from Linux 5.5 1206 */ 1207 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted 1208 { 1209 entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData); 1210 entry.cancel_flags = flags; 1211 return entry; 1212 } 1213 1214 /** 1215 * Prepares linked timeout operation. 1216 * 1217 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 1218 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 1219 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 1220 * completion event count as it's tied to a specific request. If used, the timeout specified in the 1221 * command will cancel the linked command, unless the linked command completes before the 1222 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 1223 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 1224 * request. 1225 * 1226 * Note: Available from Linux 5.5 1227 * 1228 * Params: 1229 * entry = `SubmissionEntry` to prepare 1230 * time = time specification 1231 * flags = define if it's a relative or absolute time 1232 */ 1233 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted 1234 { 1235 entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0); 1236 entry.timeout_flags = flags; 1237 return entry; 1238 } 1239 1240 /** 1241 * Note: Available from Linux 5.5 1242 */ 1243 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted 1244 { 1245 return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof); 1246 } 1247 1248 /** 1249 * Note: Available from Linux 5.6 1250 */ 1251 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe 1252 { 1253 return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset); 1254 } 1255 1256 /** 1257 * Note: Available from Linux 5.6 1258 */ 1259 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted 1260 { 1261 return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset); 1262 } 1263 1264 /** 1265 * Note: Available from Linux 5.6 1266 */ 1267 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode) @trusted 1268 { 1269 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 1270 entry.open_flags = flags; 1271 return entry; 1272 } 1273 1274 /** 1275 * Same as `prepOpenat`, but fd is put directly into fixed file table on `fileIndex`. 1276 * Note: available from Linux 5.15 1277 */ 1278 ref SubmissionEntry prepOpenatDirect(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode, uint fileIndex) @trusted 1279 { 1280 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 1281 entry.open_flags = flags; 1282 entry.file_index = fileIndex+1; 1283 return entry; 1284 } 1285 1286 /** 1287 * Note: Available from Linux 5.6 1288 */ 1289 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe 1290 { 1291 return entry.prepRW(Operation.CLOSE, fd); 1292 } 1293 1294 /** 1295 * Same as `prepClose` but operation works directly with fd registered in fixed file table on index `fileIndex`. 1296 * Note: Available from Linux 5.15 1297 */ 1298 ref SubmissionEntry prepCloseDirect(return ref SubmissionEntry entry, int fd, uint fileIndex) @safe 1299 { 1300 entry.prepRW(Operation.CLOSE, fd); 1301 entry.file_index = fileIndex+1; 1302 return entry; 1303 } 1304 1305 /** 1306 * Note: Available from Linux 5.6 1307 */ 1308 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe 1309 { 1310 return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 1311 } 1312 1313 /** 1314 * Note: Available from Linux 5.6 1315 */ 1316 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted 1317 { 1318 return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 1319 } 1320 1321 /** 1322 * Note: Available from Linux 5.6 1323 */ 1324 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mask, ref Statx statxbuf) @trusted 1325 { 1326 entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf)); 1327 entry.statx_flags = flags; 1328 return entry; 1329 } 1330 1331 /** 1332 * Note: Available from Linux 5.6 1333 */ 1334 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe 1335 { 1336 entry.prepRW(Operation.FADVISE, fd, null, len, offset); 1337 entry.fadvise_advice = advice; 1338 return entry; 1339 } 1340 1341 /** 1342 * Note: Available from Linux 5.6 1343 */ 1344 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted 1345 { 1346 entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0); 1347 entry.fadvise_advice = advice; 1348 return entry; 1349 } 1350 1351 /** 1352 * Note: Available from Linux 5.6 1353 */ 1354 ref SubmissionEntry prepSend(return ref SubmissionEntry entry, 1355 int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1356 { 1357 entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1358 entry.msg_flags = flags; 1359 return entry; 1360 } 1361 1362 /** 1363 * Note: Available from Linux 5.6 1364 */ 1365 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1366 int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1367 { 1368 entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1369 entry.msg_flags = flags; 1370 return entry; 1371 } 1372 1373 /** 1374 * Variant that uses registered buffers group. 1375 * 1376 * Note: Available from Linux 5.6 1377 */ 1378 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1379 int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe 1380 { 1381 entry.prepRW(Operation.RECV, sockfd, null, len, 0); 1382 entry.msg_flags = flags; 1383 entry.buf_group = gid; 1384 entry.flags |= SubmissionEntryFlags.BUFFER_SELECT; 1385 return entry; 1386 } 1387 1388 /** 1389 * Note: Available from Linux 5.6 1390 */ 1391 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted 1392 { 1393 return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 1394 } 1395 1396 /** 1397 * Same as `prepOpenat2`, but fd is put directly into fixed file table on `fileIndex`. 1398 * Note: available from Linux 5.15 1399 */ 1400 ref SubmissionEntry prepOpenat2Direct(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how, uint fileIndex) @trusted 1401 { 1402 entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 1403 entry.file_index = fileIndex+1; 1404 return entry; 1405 } 1406 1407 /** 1408 * Note: Available from Linux 5.6 1409 */ 1410 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted 1411 { 1412 return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd); 1413 } 1414 1415 /** 1416 * Note: Available from Linux 5.7 1417 * 1418 * This splice operation can be used to implement sendfile by splicing to an intermediate pipe 1419 * first, then splice to the final destination. In fact, the implementation of sendfile in kernel 1420 * uses splice internally. 1421 * 1422 * NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with 1423 * EINVAL if one of the fd doesn't explicitly support splice operation, e.g. reading from terminal 1424 * is unsupported from kernel 5.7 to 5.11. Check issue #291 for more information. 1425 * 1426 * Either fd_in or fd_out must be a pipe. 1427 * 1428 * Params 1429 * fd_in = input file descriptor 1430 * off_in = If fd_in refers to a pipe, off_in must be -1. 1431 * If fd_in does not refer to a pipe and off_in is -1, then bytes are read from 1432 * fd_in starting from the file offset and it is adjust appropriately; 1433 * If fd_in does not refer to a pipe and off_in is not -1, then the starting 1434 * offset of fd_in will be off_in. 1435 * fd_out = output filedescriptor 1436 * off_out = The description of off_in also applied to off_out. 1437 * len = Up to len bytes would be transfered between file descriptors. 1438 * splice_flags = see man splice(2) for description of flags. 1439 */ 1440 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry, 1441 int fd_in, ulong off_in, 1442 int fd_out, ulong off_out, 1443 uint len, uint splice_flags) @safe 1444 { 1445 entry.prepRW(Operation.SPLICE, fd_out, null, len, off_out); 1446 entry.splice_off_in = off_in; 1447 entry.splice_fd_in = fd_in; 1448 entry.splice_flags = splice_flags; 1449 return entry; 1450 } 1451 1452 /** 1453 * Note: Available from Linux 5.7 1454 * 1455 * Params: 1456 * entry = `SubmissionEntry` to prepare 1457 * buf = buffers to provide 1458 * len = length of each buffer to add 1459 * bgid = buffers group id 1460 * bid = starting buffer id 1461 */ 1462 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe 1463 { 1464 assert(buf.length <= int.max, "Too many buffers"); 1465 assert(len <= uint.max, "Buffer too large"); 1466 version (assert) { 1467 foreach (b; buf) assert(b.length <= len, "Invalid buffer length"); 1468 } 1469 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid); 1470 entry.buf_group = bgid; 1471 return entry; 1472 } 1473 1474 /// ditto 1475 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe 1476 { 1477 static assert(N <= int.max, "Too many buffers"); 1478 static assert(M <= uint.max, "Buffer too large"); 1479 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid); 1480 entry.buf_group = bgid; 1481 return entry; 1482 } 1483 1484 /// ditto 1485 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe 1486 { 1487 static assert(N <= uint.max, "Buffer too large"); 1488 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid); 1489 entry.buf_group = bgid; 1490 return entry; 1491 } 1492 1493 /// ditto 1494 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe 1495 { 1496 assert(buf.length <= uint.max, "Buffer too large"); 1497 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid); 1498 entry.buf_group = bgid; 1499 return entry; 1500 } 1501 1502 /** 1503 * Note: Available from Linux 5.7 1504 */ 1505 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe 1506 { 1507 entry.prepRW(Operation.REMOVE_BUFFERS, nr); 1508 entry.buf_group = bgid; 1509 return entry; 1510 } 1511 1512 /** 1513 * Note: Available from Linux 5.8 1514 */ 1515 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe 1516 { 1517 entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0); 1518 entry.splice_off_in = 0; 1519 entry.splice_fd_in = fd_in; 1520 entry.splice_flags = flags; 1521 return entry; 1522 } 1523 1524 /** 1525 * Note: Available from Linux 5.11 1526 */ 1527 ref SubmissionEntry prepShutdown(return ref SubmissionEntry entry, int fd, int how) @safe 1528 { 1529 return entry.prepRW(Operation.SHUTDOWN, fd, null, how, 0); 1530 } 1531 1532 /** 1533 * Note: Available from Linux 5.11 1534 */ 1535 ref SubmissionEntry prepRenameat(return ref SubmissionEntry entry, 1536 int olddfd, const(char)* oldpath, int newfd, const(char)* newpath, int flags) @trusted 1537 { 1538 entry.prepRW(Operation.RENAMEAT, olddfd, cast(void*)oldpath, newfd, cast(ulong)cast(void*)newpath); 1539 entry.rename_flags = flags; 1540 return entry; 1541 } 1542 1543 /** 1544 * Note: Available from Linux 5.11 1545 */ 1546 ref SubmissionEntry prepUnlinkat(return ref SubmissionEntry entry, int dirfd, const(char)* path, int flags) @trusted 1547 { 1548 entry.prepRW(Operation.UNLINKAT, dirfd, cast(void*)path, 0, 0); 1549 entry.unlink_flags = flags; 1550 return entry; 1551 } 1552 1553 /** 1554 * Note: Available from Linux 5.15 1555 */ 1556 ref SubmissionEntry prepMkdirat(return ref SubmissionEntry entry, int dirfd, const(char)* path, mode_t mode) @trusted 1557 { 1558 entry.prepRW(Operation.MKDIRAT, dirfd, cast(void*)path, mode, 0); 1559 return entry; 1560 } 1561 1562 /** 1563 * Note: Available from Linux 5.15 1564 */ 1565 ref SubmissionEntry prepSymlinkat(return ref SubmissionEntry entry, const(char)* target, int newdirfd, const(char)* linkpath) @trusted 1566 { 1567 entry.prepRW(Operation.SYMLINKAT, newdirfd, cast(void*)target, 0, cast(ulong)cast(void*)linkpath); 1568 return entry; 1569 } 1570 1571 /** 1572 * Note: Available from Linux 5.15 1573 */ 1574 ref SubmissionEntry prepLinkat(return ref SubmissionEntry entry, 1575 int olddirfd, const(char)* oldpath, 1576 int newdirfd, const(char)* newpath, int flags) @trusted 1577 { 1578 entry.prepRW(Operation.LINKAT, olddirfd, cast(void*)oldpath, newdirfd, cast(ulong)cast(void*)newpath); 1579 entry.hardlink_flags = flags; 1580 return entry; 1581 } 1582 1583 private: 1584 1585 // uring cleanup 1586 void dispose(ref Uring uring) @trusted 1587 { 1588 if (uring.payload is null) return; 1589 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 1590 if (--uring.payload.refs == 0) 1591 { 1592 import std.traits : hasElaborateDestructor; 1593 // debug printf("uring(%d): free\n", uring.payload.fd); 1594 static if (hasElaborateDestructor!UringDesc) 1595 destroy(*uring.payload); // call possible destructors 1596 free(cast(void*)uring.payload); 1597 } 1598 uring.payload = null; 1599 } 1600 1601 // system fields descriptor 1602 struct UringDesc 1603 { 1604 nothrow @nogc: 1605 1606 int fd; 1607 size_t refs; 1608 SetupParameters params; 1609 SubmissionQueue sq; 1610 CompletionQueue cq; 1611 1612 iovec[] regBuffers; 1613 1614 ~this() @trusted 1615 { 1616 if (regBuffers) free(cast(void*)®Buffers[0]); 1617 if (sq.ring) munmap(sq.ring, sq.ringSize); 1618 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 1619 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 1620 close(fd); 1621 } 1622 1623 private auto mapRings() @trusted 1624 { 1625 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 1626 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 1627 1628 if (params.features & SetupFeatures.SINGLE_MMAP) 1629 { 1630 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 1631 cq.ringSize = sq.ringSize; 1632 } 1633 1634 sq.ring = mmap(null, sq.ringSize, 1635 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1636 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 1637 ); 1638 1639 if (sq.ring == MAP_FAILED) 1640 { 1641 sq.ring = null; 1642 return -errno; 1643 } 1644 1645 if (params.features & SetupFeatures.SINGLE_MMAP) 1646 cq.ring = sq.ring; 1647 else 1648 { 1649 cq.ring = mmap(null, cq.ringSize, 1650 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1651 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 1652 ); 1653 1654 if (cq.ring == MAP_FAILED) 1655 { 1656 cq.ring = null; 1657 return -errno; // cleanup is done in struct destructors 1658 } 1659 } 1660 1661 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 1662 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 1663 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 1664 sq.localTail = *sq.ktail; 1665 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 1666 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 1667 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 1668 1669 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 1670 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 1671 // That way, head and tail are actually indexes to our sqes array. 1672 foreach (i; 0..entries) 1673 { 1674 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 1675 } 1676 1677 auto psqes = mmap( 1678 null, entries * SubmissionEntry.sizeof, 1679 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1680 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 1681 ); 1682 1683 if (psqes == MAP_FAILED) return -errno; 1684 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 1685 1686 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 1687 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 1688 cq.localHead = *cq.khead; 1689 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 1690 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 1691 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 1692 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 1693 cq.kflags = cast(uint*)(cq.ring + params.cq_off.flags); 1694 return 0; 1695 } 1696 } 1697 1698 /// Wraper for `SubmissionEntry` queue 1699 struct SubmissionQueue 1700 { 1701 nothrow @nogc: 1702 1703 // mmaped fields 1704 uint* khead; // controlled by kernel 1705 uint* ktail; // controlled by us 1706 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1707 uint* kdropped; // counter of invalid submissions (out of bound index) 1708 uint ringMask; // constant mask used to determine array index from head/tail 1709 1710 // mmap details (for cleanup) 1711 void* ring; // pointer to the mmaped region 1712 size_t ringSize; // size of mmaped memory block 1713 1714 // mmapped list of entries (fixed length) 1715 SubmissionEntry[] sqes; 1716 1717 uint localTail; // used for batch submission 1718 1719 uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); } 1720 uint tail() const @safe pure { return localTail; } 1721 1722 void flushTail() @safe pure 1723 { 1724 pragma(inline, true); 1725 // debug printf("SQ updating tail: %d\n", localTail); 1726 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1727 } 1728 1729 SubmissionQueueFlags flags() const @safe pure 1730 { 1731 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1732 } 1733 1734 bool full() const @safe pure { return sqes.length == length; } 1735 1736 size_t length() const @safe pure { return tail - head; } 1737 1738 size_t capacity() const @safe pure { return sqes.length - length; } 1739 1740 ref SubmissionEntry next()() @safe pure return 1741 { 1742 assert(!full, "SumbissionQueue is full"); 1743 return sqes[localTail++ & ringMask]; 1744 } 1745 1746 void put()(auto ref SubmissionEntry entry) @safe pure 1747 { 1748 assert(!full, "SumbissionQueue is full"); 1749 sqes[localTail++ & ringMask] = entry; 1750 } 1751 1752 void put(OP)(auto ref OP op) 1753 if (!is(OP == SubmissionEntry)) 1754 { 1755 assert(!full, "SumbissionQueue is full"); 1756 sqes[localTail++ & ringMask].fill(op); 1757 } 1758 1759 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1760 { 1761 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1762 1763 static assert( 1764 Parameters!FN.length >= 1 1765 && is(Parameters!FN[0] == SubmissionEntry) 1766 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1767 "Alias function must accept at least `ref SubmissionEntry`"); 1768 1769 static assert( 1770 is(typeof(FN(sqes[localTail & ringMask], args))), 1771 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1772 1773 assert(!full, "SumbissionQueue is full"); 1774 FN(sqes[localTail++ & ringMask], args); 1775 } 1776 1777 uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1778 } 1779 1780 struct CompletionQueue 1781 { 1782 nothrow @nogc: 1783 1784 // mmaped fields 1785 uint* khead; // controlled by us (increment after entry at head was read) 1786 uint* ktail; // updated by kernel 1787 uint* koverflow; 1788 uint* kflags; 1789 CompletionEntry[] cqes; // array of entries (fixed length) 1790 1791 uint ringMask; // constant mask used to determine array index from head/tail 1792 1793 // mmap details (for cleanup) 1794 void* ring; 1795 size_t ringSize; 1796 1797 uint localHead; // used for bulk reading 1798 1799 uint head() const @safe pure { return localHead; } 1800 uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1801 1802 void flushHead() @safe pure 1803 { 1804 pragma(inline, true); 1805 // debug printf("CQ updating head: %d\n", localHead); 1806 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1807 } 1808 1809 bool empty() const @safe pure { return head == tail; } 1810 1811 ref CompletionEntry front() @safe pure return 1812 { 1813 assert(!empty, "CompletionQueue is empty"); 1814 return cqes[localHead & ringMask]; 1815 } 1816 1817 void popFront() @safe pure 1818 { 1819 pragma(inline); 1820 assert(!empty, "CompletionQueue is empty"); 1821 localHead++; 1822 flushHead(); 1823 } 1824 1825 size_t length() const @safe pure { return tail - localHead; } 1826 1827 uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1828 1829 /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel. 1830 void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); } 1831 } 1832 1833 // just a helper to use atomicStore more easily with older compilers 1834 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1835 { 1836 pragma(inline, true); 1837 import core.atomic : store = atomicStore; 1838 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1839 else store!ms(*(cast(shared T*)&val), newVal); 1840 } 1841 1842 // just a helper to use atomicLoad more easily with older compilers 1843 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1844 { 1845 pragma(inline, true); 1846 import core.atomic : load = atomicLoad; 1847 static if (__VERSION__ >= 2089) return load!ms(val); 1848 else return load!ms(*(cast(const shared T*)&val)); 1849 } 1850 1851 version (assert) 1852 { 1853 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1854 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1855 static assert(isOutputRange!(Uring, SubmissionEntry)); 1856 } 1857 1858 version (LDC) 1859 { 1860 import ldc.intrinsics : llvm_expect; 1861 alias _expect = llvm_expect; 1862 } 1863 else 1864 { 1865 T _expect(T)(T val, T expected_val) if (__traits(isIntegral, T)) 1866 { 1867 pragma(inline, true); 1868 return val; 1869 } 1870 }