1 /**
2  * Simple idiomatic dlang wrapper around linux io_uring
3  * (see: https://kernel.dk/io_uring.pdf) asynchronous API.
4  */
5 module during;
6 
7 version(linux):
8 
9 public import during.io_uring;
10 import during.openat2;
11 
12 import core.atomic : MemoryOrder;
13 debug import core.stdc.stdio;
14 import core.stdc.stdlib;
15 import core.sys.linux.epoll;
16 import core.sys.linux.errno;
17 import core.sys.linux.sched;
18 import core.sys.linux.sys.mman;
19 import core.sys.linux.unistd;
20 import core.sys.posix.signal;
21 import core.sys.posix.sys.socket;
22 import core.sys.posix.sys.types;
23 import core.sys.posix.sys.uio;
24 import std.algorithm.comparison : among;
25 
26 nothrow @nogc:
27 
28 /**
29  * Setup new instance of io_uring into provided `Uring` structure.
30  *
31  * Params:
32  *     uring = `Uring` structure to be initialized (must not be already initialized)
33  *     entries = Number of entries to initialize uring with
34  *     flags = `SetupFlags` to use to initialize uring.
35  *
36  * Returns: On succes it returns 0, `-errno` otherwise.
37  */
38 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) @safe
39 {
40     assert(uring.payload is null, "Uring is already initialized");
41     uring.payload = () @trusted { return cast(UringDesc*)calloc(1, UringDesc.sizeof); }();
42     if (uring.payload is null) return -errno;
43 
44     uring.payload.params.flags = flags;
45     uring.payload.refs = 1;
46     auto r = io_uring_setup(entries, uring.payload.params);
47     if (r < 0) return -errno;
48 
49     uring.payload.fd = r;
50 
51     r = uring.payload.mapRings();
52     if (r < 0)
53     {
54         dispose(uring);
55         return r;
56     }
57 
58     // debug printf("uring(%d): setup\n", uring.payload.fd);
59 
60     return 0;
61 }
62 
63 /**
64  * Simplified wrapper around `io_uring_probe` that is used to check what io_uring operations current
65  * kernel is actually supporting.
66  */
67 struct Probe
68 {
69     static assert (Operation.max < 64, "Needs to be adjusted");
70     private
71     {
72         io_uring_probe probe;
73         io_uring_probe_op[64] ops;
74         int err;
75     }
76 
77     const @safe pure nothrow @nogc:
78 
79     /// Is operation supported?
80     bool isSupported(Operation op)
81     in (op <= Operation.max, "Invalid operation")
82     {
83         if (op > probe.last_op) return false;
84         assert(ops[op].op == op, "Operations differs");
85         return (ops[op].flags & IO_URING_OP_SUPPORTED) != 0;
86     }
87 
88     /// Error code when we fail to get `Probe`.
89     @property int error() { return err; }
90 
91     /// `true` if probe was sucesfully retrieved.
92     T opCast(T)() if (is(T == bool)) { return err == 0; }
93 }
94 
95 /// Probes supported operations on a temporary created uring instance
96 Probe probe() @safe nothrow @nogc
97 {
98     Uring io;
99     immutable ret = io.setup(2);
100     if (ret < 0) {
101         Probe res;
102         res.err = ret;
103         return res;
104     }
105 
106     return io.probe();
107 }
108 
109 /**
110  * Main entry point to work with io_uring.
111  *
112  * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface.
113  * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries.
114  *
115  * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`.
116  *
117  * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for
118  * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry`
119  * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's
120  * reused for multiple operations.
121  */
122 struct Uring
123 {
124     nothrow @nogc:
125 
126     private UringDesc* payload;
127 
128     /// Copy constructor
129     this(ref return scope Uring rhs) @safe pure
130     {
131         assert(rhs.payload !is null, "rhs payload is null");
132         // debug printf("uring(%d): copy\n", rhs.payload.fd);
133         this.payload = rhs.payload;
134         this.payload.refs++;
135     }
136 
137     /// Destructor
138     ~this() @safe
139     {
140         dispose(this);
141     }
142 
143     /// Probes supported operations
144     Probe probe() @safe
145     in (payload !is null, "Uring hasn't been initialized yet")
146     {
147         Probe res;
148         immutable ret = () @trusted { return io_uring_register(
149             payload.fd,
150             RegisterOpCode.REGISTER_PROBE,
151             cast(void*)&res.probe, res.ops.length
152         ); }();
153         if (ret < 0) res.err = ret;
154         return res;
155     }
156 
157     /// Native io_uring file descriptor
158     int fd() const @safe pure
159     in (payload !is null, "Uring hasn't been initialized yet")
160     {
161         return payload.fd;
162     }
163 
164     /// io_uring parameters
165     SetupParameters params() const @safe pure return
166     in (payload !is null, "Uring hasn't been initialized yet")
167     {
168         return payload.params;
169     }
170 
171     /// Check if there is some `CompletionEntry` to process.
172     bool empty() const @safe pure
173     in (payload !is null, "Uring hasn't been initialized yet")
174     {
175         return payload.cq.empty;
176     }
177 
178     /// Check if there is space for another `SubmissionEntry` to submit.
179     bool full() const @safe pure
180     in (payload !is null, "Uring hasn't been initialized yet")
181     {
182         return payload.sq.full;
183     }
184 
185     /// Available space in submission queue before it becomes full
186     size_t capacity() const @safe pure
187     in (payload !is null, "Uring hasn't been initialized yet")
188     {
189         return payload.sq.capacity;
190     }
191 
192     /// Number of entries in completion queue
193     size_t length() const @safe pure
194     in (payload !is null, "Uring hasn't been initialized yet")
195     {
196         return payload.cq.length;
197     }
198 
199     /// Get first `CompletionEntry` from cq ring
200     ref CompletionEntry front() @safe pure return
201     in (payload !is null, "Uring hasn't been initialized yet")
202     {
203         return payload.cq.front;
204     }
205 
206     /// Move to next `CompletionEntry`
207     void popFront() @safe pure
208     in (payload !is null, "Uring hasn't been initialized yet")
209     {
210         return payload.cq.popFront;
211     }
212 
213     /**
214      * Adds new entry to the `SubmissionQueue`.
215      *
216      * Note that this just adds entry to the queue and doesn't advance the tail
217      * marker kernel sees. For that `finishSq()` is needed to be called next.
218      *
219      * Also note that to actually enter new entries to kernel,
220      * it's needed to call `submit()`.
221      *
222      * Params:
223      *     FN    = Function to fill next entry in queue by `ref` (should be faster).
224      *             It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`.
225      *             Note that in this case queue entry is cleaned first before function is called.
226      *     entry = Custom built `SubmissionEntry` to be posted as is.
227      *             Note that in this case it is copied whole over one in the `SubmissionQueue`.
228      *     args  = Optional arguments passed to the function
229      *
230      * Returns: reference to `Uring` structure so it's possible to chain multiple commands.
231      */
232     ref Uring put()(auto ref SubmissionEntry entry) @safe pure return
233     in (payload !is null, "Uring hasn't been initialized yet")
234     {
235         payload.sq.put(entry);
236         return this;
237     }
238 
239     /// ditto
240     ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return
241     in (payload !is null, "Uring hasn't been initialized yet")
242     {
243         import std.functional : forward;
244         payload.sq.putWith!FN(forward!args);
245         return this;
246     }
247 
248     /**
249      * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled
250      * to next `SubmissionEntry` in queue.
251      *
252      * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied.
253      *
254      * Params:
255      *   op = Custom operation definition.
256      * Returns:
257      */
258     ref Uring put(OP)(auto ref OP op) return
259         if (!is(OP == SubmissionEntry))
260     in (payload !is null, "Uring hasn't been initialized yet")
261     {
262         payload.sq.put(op);
263         return this;
264     }
265 
266     /**
267      * Advances the userspace submision queue and returns last `SubmissionEntry`.
268      */
269     ref SubmissionEntry next()() @safe pure
270     in (payload !is null, "Uring hasn't been initialized yet")
271     {
272         return payload.sq.next();
273     }
274 
275     /**
276      * If completion queue is full, the new event maybe dropped.
277      * This value records number of dropped events.
278      */
279     uint overflow() const @safe pure
280     in (payload !is null, "Uring hasn't been initialized yet")
281     {
282         return payload.cq.overflow;
283     }
284 
285     /// Counter of invalid submissions (out-of-bound index in submission array)
286     uint dropped() const @safe pure
287     in (payload !is null, "Uring hasn't been initialized yet")
288     {
289         return payload.sq.dropped;
290     }
291 
292     /**
293      * Submits qued `SubmissionEntry` to be processed by kernel.
294      *
295      * Params:
296      *     want  = number of `CompletionEntries` to wait for.
297      *             If 0, this just submits queued entries and returns.
298      *             If > 0, it blocks until at least wanted number of entries were completed.
299      *     sig   = See io_uring_enter(2) man page
300      *
301      * Returns: Number of submitted entries on success, `-errno` on error
302      */
303     int submit(S)(uint want, const S* args)
304         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
305     in (payload !is null, "Uring hasn't been initialized yet")
306     {
307         if (_expect(want > 0, false)) return submitAndWait(want, args);
308         return submit(args);
309     }
310 
311     /// ditto
312     int submit(uint want) @safe
313     {
314         pragma(inline, true)
315         if (_expect(want > 0, true)) return submitAndWait(want, cast(sigset_t*)null);
316         return submit(cast(sigset_t*)null);
317     }
318 
319     /// ditto
320     int submit(S)(const S* args) @trusted
321         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
322     in (payload !is null, "Uring hasn't been initialized yet")
323     {
324         immutable len = cast(int)payload.sq.length;
325         if (_expect(len > 0, true)) // anything to submit?
326         {
327             payload.sq.flushTail(); // advance queue index
328 
329             EnterFlags flags;
330             if (payload.params.flags & SetupFlags.SQPOLL)
331             {
332                 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false))
333                     flags |= EnterFlags.SQ_WAKEUP;
334                 else return len; // fast poll
335             }
336             immutable r = io_uring_enter(payload.fd, len, 0, flags, args);
337             if (_expect(r < 0, false)) return -errno;
338             return r;
339         }
340         return 0;
341     }
342 
343     /// ditto
344     int submit() @safe
345     {
346         pragma(inline, true)
347         return submit(cast(sigset_t*)null);
348     }
349 
350     /**
351      * Flushes submission queue index to the kernel.
352      * Doesn't call any syscall, it just advances the SQE queue for kernel.
353      * This can be used with `IORING_SETUP_SQPOLL` when kernel polls the submission queue.
354      */
355     void flush() @safe
356     {
357         payload.sq.flushTail(); // advance queue index
358     }
359 
360     /**
361      * Simmilar to `submit` but with this method we just wait for required number
362      * of `CompletionEntries`.
363      *
364      * Returns: `0` on success, `-errno` on error
365      */
366     int wait(S)(uint want = 1, const S* args = null) @trusted
367         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
368     in (payload !is null, "Uring hasn't been initialized yet")
369     in (want > 0, "Invalid want value")
370     {
371         pragma(inline);
372         if (payload.cq.length >= want) return 0; // we don't need to syscall
373         immutable r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, args);
374         if (_expect(r < 0, false)) return -errno;
375         return 0;
376     }
377 
378     /// ditto
379     int wait(uint want = 1)
380     {
381         pragma(inline, true)
382         return wait(want, cast(sigset_t*)null);
383     }
384 
385     /**
386      * Special case of a `submit` that can be used when we know beforehead that we want to wait for
387      * some amount of CQEs.
388      */
389     int submitAndWait(S)(uint want, const S* args) @trusted
390         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
391     in (payload !is null, "Uring hasn't been initialized yet")
392     in (want > 0, "Invalid want value")
393     {
394         immutable len = cast(int)payload.sq.length;
395         if (_expect(len > 0, true)) // anything to submit?
396         {
397             payload.sq.flushTail(); // advance queue index
398 
399             EnterFlags flags = EnterFlags.GETEVENTS;
400             if (payload.params.flags & SetupFlags.SQPOLL)
401             {
402                 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false))
403                     flags |= EnterFlags.SQ_WAKEUP;
404             }
405             immutable r = io_uring_enter(payload.fd, len, want, flags, args);
406             if (_expect(r < 0, false)) return -errno;
407             return r;
408         }
409         return wait(want); // just simple wait
410     }
411 
412     /// ditto
413     int submitAndWait(uint want) @safe
414     {
415         pragma(inline, true)
416         return submitAndWait(want, cast(sigset_t*)null);
417     }
418 
419     /**
420      * Register single buffer to be mapped into the kernel for faster buffered operations.
421      *
422      * To use the buffers, the application must specify the fixed variants for of operations,
423      * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set
424      * in entry extra data.
425      *
426      * An application can increase or decrease the size or number of registered buffers by first
427      * unregistering the existing buffers, and then issuing a new call to io_uring_register() with
428      * the new buffers.
429      *
430      * Params:
431      *   buffer = Buffers to be registered
432      *
433      * Returns: On success, returns 0. On error, `-errno` is returned.
434      */
435     int registerBuffers(T)(T buffers)
436         if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else?
437     in (payload !is null, "Uring hasn't been initialized yet")
438     in (buffers.length, "Empty buffer")
439     {
440         if (payload.regBuffers !is null)
441             return -EBUSY; // buffers were already registered
442 
443         static if (is(T == ubyte[]))
444         {
445             auto p = malloc(iovec.sizeof);
446             if (_expect(p is null, false)) return -errno;
447             payload.regBuffers = (cast(iovec*)p)[0..1];
448             payload.regBuffers[0].iov_base = cast(void*)&buffers[0];
449             payload.regBuffers[0].iov_len = buffers.length;
450         }
451         else static if (is(T == ubyte[][]))
452         {
453             auto p = malloc(buffers.length * iovec.sizeof);
454             if (_expect(p is null, false)) return -errno;
455             payload.regBuffers = (cast(iovec*)p)[0..buffers.length];
456 
457             foreach (i, b; buffers)
458             {
459                 assert(b.length, "Empty buffer");
460                 payload.regBuffers[i].iov_base = cast(void*)&b[0];
461                 payload.regBuffers[i].iov_len = b.length;
462             }
463         }
464 
465         immutable r = io_uring_register(
466                 payload.fd,
467                 RegisterOpCode.REGISTER_BUFFERS,
468                 cast(const(void)*)payload.regBuffers.ptr, 1
469             );
470 
471         if (_expect(r < 0, false)) return -errno;
472         return 0;
473     }
474 
475     /**
476      * Releases all previously registered buffers associated with the `io_uring` instance.
477      *
478      * An application need not unregister buffers explicitly before shutting down the io_uring instance.
479      *
480      * Returns: On success, returns 0. On error, `-errno` is returned.
481      */
482     int unregisterBuffers() @trusted
483     in (payload !is null, "Uring hasn't been initialized yet")
484     {
485         if (payload.regBuffers is null)
486             return -ENXIO; // no buffers were registered
487 
488         free(cast(void*)&payload.regBuffers[0]);
489         payload.regBuffers = null;
490 
491         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0);
492         if (_expect(r < 0, false)) return -errno;
493         return 0;
494     }
495 
496     /**
497      * Register files for I/O.
498      *
499      * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags
500      * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the
501      * file descriptor array.
502      *
503      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
504      * need only unregister if it wishes to register a new set of fds.
505      *
506      * Use `-1` as a file descriptor to mark it as reserved in the array.*
507      * Params: fds = array of file descriptors to be registered
508      *
509      * Returns: On success, returns 0. On error, `-errno` is returned.
510      */
511     int registerFiles(const(int)[] fds)
512     in (payload !is null, "Uring hasn't been initialized yet")
513     in (fds.length, "No file descriptors provided")
514     in (fds.length < uint.max, "Too many file descriptors")
515     {
516         // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers).
517         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length);
518         if (_expect(r < 0, false)) return -errno;
519         return 0;
520     }
521 
522     /*
523      * Register an update for an existing file set. The updates will start at
524      * `off` in the original array.
525      *
526      * Use `-1` as a file descriptor to mark it as reserved in the array.
527      *
528      * Params:
529      *      off = offset to the original registered files to be updated
530      *      files = array of file descriptors to update with
531      *
532      * Returns: number of files updated on success, -errno on failure.
533      */
534     int registerFilesUpdate(uint off, const(int)[] fds) @trusted
535     in (payload !is null, "Uring hasn't been initialized yet")
536     in (fds.length, "No file descriptors provided to update")
537     in (fds.length < uint.max, "Too many file descriptors")
538     {
539         struct Update // represents io_uring_files_update (obsolete) or io_uring_rsrc_update
540         {
541             uint offset;
542             uint _resv;
543             ulong data;
544         }
545 
546         static assert (Update.sizeof == 16);
547 
548         Update u = { offset: off, data: cast(ulong)&fds[0] };
549         immutable r = io_uring_register(
550             payload.fd,
551             RegisterOpCode.REGISTER_FILES_UPDATE,
552             &u, cast(uint)fds.length);
553         if (_expect(r < 0, false)) return -errno;
554         return 0;
555     }
556 
557     /**
558      * All previously registered files associated with the `io_uring` instance will be unregistered.
559      *
560      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
561      * need only unregister if it wishes to register a new set of fds.
562      *
563      * Returns: On success, returns 0. On error, `-errno` is returned.
564      */
565     int unregisterFiles() @trusted
566     in (payload !is null, "Uring hasn't been initialized yet")
567     {
568         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0);
569         if (_expect(r < 0, false)) return -errno;
570         return 0;
571     }
572 
573     /**
574      * Registers event file descriptor that would be used as a notification mechanism on completion
575      * queue change.
576      *
577      * Params: eventFD = event filedescriptor to be notified about change
578      *
579      * Returns: On success, returns 0. On error, `-errno` is returned.
580      */
581     int registerEventFD(int eventFD) @trusted
582     in (payload !is null, "Uring hasn't been initialized yet")
583     {
584         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1);
585         if (_expect(r < 0, false)) return -errno;
586         return 0;
587     }
588 
589     /**
590      * Unregister previously registered notification event file descriptor.
591      *
592      * Returns: On success, returns 0. On error, `-errno` is returned.
593      */
594     int unregisterEventFD() @trusted
595     in (payload !is null, "Uring hasn't been initialized yet")
596     {
597         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0);
598         if (_expect(r < 0, false)) return -errno;
599         return 0;
600     }
601 
602     /**
603      * Generic means to register resources.
604      *
605      * Note: Available from Linux 5.13
606      */
607     int registerRsrc(R)(RegisterOpCode type, const(R)[] data, const(ulong)[] tags)
608     in (payload !is null, "Uring hasn't been initialized yet")
609     in (data.length == tags.length, "Different array lengths")
610     in (data.length < uint.max, "Too many resources")
611     {
612         struct Register // represents io_uring_rsrc_register
613         {
614             uint nr;
615             uint _resv;
616             ulong _resv2;
617             ulong data;
618             ulong tags;
619         }
620 
621         static assert (Register.sizeof == 32);
622 
623         Register r = {
624             nr: cast(uint)data.length,
625             data: cast(ulong)&data[0],
626             tags: cast(ulong)&tags[0]
627         };
628         immutable ret = io_uring_register(
629             payload.fd,
630             type,
631             &r, sizeof(r));
632         if (_expect(ret < 0, false)) return -errno;
633         return 0;
634     }
635 
636     /**
637      * Generic means to update registered resources.
638      *
639      * Note: Available from Linux 5.13
640      */
641     int registerRsrcUpdate(R)(RegisterOpCode type, uint off, const(R)[] data, const(ulong)[] tags) @trusted
642     in (payload !is null, "Uring hasn't been initialized yet")
643     in (data.length == tags.length, "Different array lengths")
644     in (data.length < uint.max, "Too many file descriptors")
645     {
646         struct Update // represents io_uring_rsrc_update2
647         {
648             uint offset;
649             uint _resv;
650             ulong data;
651             ulong tags;
652             uint nr;
653             uint _resv2;
654         }
655 
656         static assert (Update.sizeof == 32);
657 
658         Update u = {
659             offset: off,
660             data: cast(ulong)&data[0],
661             tags: cast(ulong)&tags[0],
662             nr: cast(uint)data.length,
663         };
664         immutable r = io_uring_register(payload.fd, type, &u, sizeof(u));
665         if (_expect(r < 0, false)) return -errno;
666         return 0;
667     }
668 
669     /**
670      * Register a feature whitelist. Attempting to call any operations which are not whitelisted
671      * will result in an error.
672      *
673      * Note: Can only be called once to prevent other code from bypassing the whitelist.
674      *
675      * Params: res = the struct containing the restriction info.
676      *
677      * Returns: On success, returns 0. On error, `-errno` is returned.
678      *
679      * Note: Available from Linux 5.10
680      */
681     int registerRestrictions(scope ref io_uring_restriction res) @trusted
682     in (payload !is null, "Uring hasn't been initialized yet")
683     {
684         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_RESTRICTIONS, &res, 1);
685         if (_expect(r < 0, false)) return -errno;
686         return 0;
687     }
688 
689     /**
690      * Enable the "rings" of this Uring if they were previously disabled with
691      * `IORING_SETUP_R_DISABLED`.
692      *
693      * Returns: On success, returns 0. On error, `-errno` is returned.
694      *
695      * Note: Available from Linux 5.10
696      */
697     int enableRings() @trusted
698     in (payload !is null, "Uring hasn't been initialized yet")
699     {
700         immutable r = io_uring_register(payload.fd, RegisterOpCode.ENABLE_RINGS, null, 0);
701         if (_expect(r < 0, false)) return -errno;
702         return 0;
703     }
704 
705     /**
706      * By default, async workers created by io_uring will inherit the CPU mask of its parent. This
707      * is usually all the CPUs in the system, unless the parent is being run with a limited set. If
708      * this isn't the desired outcome, the application may explicitly tell io_uring what CPUs the
709      * async workers may run on.
710      *
711      * Note: Available since 5.14.
712      */
713     int registerIOWQAffinity(cpu_set_t[] cpus) @trusted
714     in (cpus.length)
715     {
716         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_AFF, cpus.ptr, cast(uint)cpus.length);
717         if (_expect(r < 0, false)) return -errno;
718         return 0;
719     }
720 
721     /**
722      * Undoes a CPU mask previously set with `registerIOWQAffinity`.
723      *
724      * Note: Available since 5.14
725      */
726     int unregisterIOWQAffinity() @trusted
727     {
728         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_IOWQ_AFF, null, 0);
729         if (_expect(r < 0, false)) return -errno;
730         return 0;
731     }
732 
733     /**
734      * By default, io_uring limits the unbounded workers created to the maximum processor count set
735      * by `RLIMIT_NPROC` and the bounded workers is a function of the SQ ring size and the number of
736      * CPUs in the system. Sometimes this can be excessive (or too little, for bounded), and this
737      * command provides a way to change the count per ring (per NUMA node) instead.
738      *
739      * `val` must be set to an `uint` pointer to an array of two values, with the values in the
740      * array being set to the maximum count of workers per NUMA node. Index 0 holds the bounded
741      * worker count, and index 1 holds the unbounded worker count. On successful return, the passed
742      * in array will contain the previous maximum values for each type. If the count being passed in
743      * is 0, then this command returns the current maximum values and doesn't modify the current
744      * setting.
745      *
746      * Note: Available since 5.15
747      */
748     int registerIOWQMaxWorkers(ref uint[2] workers) @trusted
749     {
750         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_MAX_WORKERS, &workers, 2);
751         if (_expect(r < 0, false)) return -errno;
752         return 0;
753     }
754 }
755 
756 /**
757  * Uses custom operation definition to fill fields of `SubmissionEntry`.
758  * Can be used in cases, when builtin prep* functions aren't enough.
759  *
760  * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work.
761  *
762  * Note: This doesn't touch previous state of the entry, just fills the corresponding fields.
763  *       So it might be needed to call `clear` first on the entry (depends on usage).
764  *
765  * Params:
766  *   entry = entry to set parameters to
767  *   op = operation to fill entry with (can be custom type)
768  */
769 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op)
770 {
771     pragma(inline);
772     import std.traits : hasMember, FieldNameTuple;
773 
774     // fill entry from provided operation fields (they must have same name as in SubmissionEntry)
775     foreach (m; FieldNameTuple!E)
776     {
777         static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m);
778         __traits(getMember, entry, m) = __traits(getMember, op, m);
779     }
780 
781     return entry;
782 }
783 
784 /**
785  * Template function to help set `SubmissionEntry` `user_data` field.
786  *
787  * Params:
788  *      entry = `SubmissionEntry` to prepare
789  *      data = data to set to the `SubmissionEntry`
790  *
791  * Note: data are passed by ref and must live during whole operation.
792  */
793 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted
794 {
795     pragma(inline);
796     entry.user_data = cast(ulong)(cast(void*)&data);
797     return entry;
798 }
799 
800 /**
801  * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData`
802  * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to
803  * the data.
804  *
805  * Because of that, data must be of `ulong.sizeof`.
806  */
807 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted
808     if (D.sizeof == ulong.sizeof)
809 {
810     pragma(inline);
811     entry.user_data = *(cast(ulong*)(cast(void*)&data));
812     return entry;
813 }
814 
815 /**
816  * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`).
817  */
818 D userDataAs(D)(ref CompletionEntry entry) @trusted
819     if (D.sizeof == ulong.sizeof)
820 {
821     pragma(inline);
822     return *(cast(D*)(cast(void*)&entry.user_data));
823 }
824 
825 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op,
826     int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe
827 {
828     pragma(inline);
829     entry.opcode = op;
830     entry.fd = fd;
831     entry.off = offset;
832     entry.flags = SubmissionEntryFlags.NONE;
833     entry.ioprio = 0;
834     entry.addr = cast(ulong)addr;
835     entry.len = len;
836     entry.rw_flags = ReadWriteFlags.NONE;
837 	entry.user_data = 0;
838     entry.buf_index = 0;
839     entry.personality = 0;
840     entry.file_index = 0;
841     entry.__pad2[0] = entry.__pad2[1] = 0;
842     return entry;
843 }
844 
845 /**
846  * Prepares `nop` operation.
847  *
848  * Params:
849  *      entry = `SubmissionEntry` to prepare
850  */
851 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe
852 {
853     entry.prepRW(Operation.NOP);
854     return entry;
855 }
856 
857 /**
858  * Prepares `readv` operation.
859  *
860  * Params:
861  *      entry = `SubmissionEntry` to prepare
862  *      fd = file descriptor of file we are operating on
863  *      offset = offset
864  *      buffer = iovec buffers to be used by the operation
865  */
866 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted
867     if (is(V == iovec[]) || is(V == iovec))
868 {
869     static if (is(V == iovec[]))
870     {
871         assert(buffer.length, "Empty buffer");
872         assert(buffer.length < uint.max, "Too many iovec buffers");
873         return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
874     }
875     else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset);
876 }
877 
878 /**
879  * Prepares `writev` operation.
880  *
881  * Params:
882  *      entry = `SubmissionEntry` to prepare
883  *      fd = file descriptor of file we are operating on
884  *      offset = offset
885  *      buffer = iovec buffers to be used by the operation
886  */
887 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted
888     if (is(V == iovec[]) || is(V == iovec))
889 {
890     static if (is(V == iovec[]))
891     {
892         assert(buffer.length, "Empty buffer");
893         assert(buffer.length < uint.max, "Too many iovec buffers");
894         return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
895     }
896     else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset);
897 }
898 
899 /**
900  * Prepares `read_fixed` operation.
901  *
902  * Params:
903  *      entry = `SubmissionEntry` to prepare
904  *      fd = file descriptor of file we are operating on
905  *      offset = offset
906  *      buffer = slice to preregistered buffer
907  *      bufferIndex = index to the preregistered buffers array buffer belongs to
908  */
909 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe
910 {
911     assert(buffer.length, "Empty buffer");
912     assert(buffer.length < uint.max, "Buffer too large");
913     entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
914     entry.buf_index = bufferIndex;
915     return entry;
916 }
917 
918 /**
919  * Prepares `write_fixed` operation.
920  *
921  * Params:
922  *      entry = `SubmissionEntry` to prepare
923  *      fd = file descriptor of file we are operating on
924  *      offset = offset
925  *      buffer = slice to preregistered buffer
926  *      bufferIndex = index to the preregistered buffers array buffer belongs to
927  */
928 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe
929 {
930     assert(buffer.length, "Empty buffer");
931     assert(buffer.length < uint.max, "Buffer too large");
932     entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
933     entry.buf_index = bufferIndex;
934     return entry;
935 }
936 
937 /**
938  * Prepares `recvmsg(2)` operation.
939  *
940  * Params:
941  *      entry = `SubmissionEntry` to prepare
942  *      fd = file descriptor of file we are operating on
943  *      msg = message to operate with
944  *      flags = `recvmsg` operation flags
945  *
946  * Note: Available from Linux 5.3
947  *
948  * See_Also: `recvmsg(2)` man page for details.
949  */
950 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
951 {
952     entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0);
953     entry.msg_flags = flags;
954     return entry;
955 }
956 
957 /**
958  * Prepares `sendmsg(2)` operation.
959  *
960  * Params:
961  *      entry = `SubmissionEntry` to prepare
962  *      fd = file descriptor of file we are operating on
963  *      msg = message to operate with
964  *      flags = `sendmsg` operation flags
965  *
966  * Note: Available from Linux 5.3
967  *
968  * See_Also: `sendmsg(2)` man page for details.
969  */
970 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
971 {
972     entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0);
973     entry.msg_flags = flags;
974     return entry;
975 }
976 
977 /**
978  * Prepares `fsync` operation.
979  *
980  * Params:
981  *      entry = `SubmissionEntry` to prepare
982  *      fd = file descriptor of a file to call `fsync` on
983  *      flags = `fsync` operation flags
984  */
985 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe
986 {
987     entry.prepRW(Operation.FSYNC, fd);
988     entry.fsync_flags = flags;
989     return entry;
990 }
991 
992 /**
993  * Poll the fd specified in the submission queue entry for the events specified in the poll_events
994  * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode.
995  * That is, once the poll operation is completed, it will have to be resubmitted.
996  *
997  * Params:
998  *      entry = `SubmissionEntry` to prepare
999  *      fd = file descriptor to poll
1000  *      events = events to poll on the FD
1001  *      flags = poll operation flags
1002  */
1003 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry,
1004     int fd, PollEvents events, PollFlags flags = PollFlags.NONE) @safe
1005 {
1006     import std.system : endian, Endian;
1007 
1008     entry.prepRW(Operation.POLL_ADD, fd, null, flags);
1009     static if (endian == Endian.bigEndian)
1010         entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16;
1011     else
1012         entry.poll_events32 = events;
1013     return entry;
1014 }
1015 
1016 /**
1017  * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain
1018  * `0`.  If not found, res will contain `-ENOENT`.
1019  *
1020  * Params:
1021  *      entry = `SubmissionEntry` to prepare
1022  *      userData = data with the previously issued poll operation
1023  */
1024 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted
1025 {
1026     return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData);
1027 }
1028 
1029 /**
1030  * Allow events and user_data update of running poll requests.
1031  *
1032  * Note: available from Linux 5.13
1033  */
1034 ref SubmissionEntry prepPollUpdate(U, V)(return ref SubmissionEntry entry,
1035     ref U oldUserData, ref V newUserData, PollEvents events = PollEvents.NONE) @trusted
1036 {
1037     import std.system : endian, Endian;
1038 
1039     PollFlags flags;
1040     if (events != PollEvents.NONE) flags |= PollFlags.UPDATE_EVENTS;
1041     if (cast(void*)&oldUserData !is cast(void*)&newUserData) flags |= PollFlags.UPDATE_USER_DATA;
1042 
1043     entry.prepRW(
1044         Operation.POLL_REMOVE,
1045         -1,
1046         cast(void*)&oldUserData,
1047         flags,
1048         cast(ulong)cast(void*)&newUserData
1049     );
1050     static if (endian == Endian.bigEndian)
1051         entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16;
1052     else
1053         entry.poll_events32 = events;
1054     return entry;
1055 }
1056 
1057 /**
1058  * Prepares `sync_file_range(2)` operation.
1059  *
1060  * Sync a file segment with disk, permits fine control when synchronizing the open file referred to
1061  * by the file descriptor fd with disk.
1062  *
1063  * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized.
1064  *
1065  * Params:
1066  *      entry = `SubmissionEntry` to prepare
1067  *      fd = is the file descriptor to sync
1068  *      offset = the starting byte of the file range to be synchronized
1069  *      len = the length of the range to be synchronized, in bytes
1070  *      flags = the flags for the command.
1071  *
1072  * See_Also: `sync_file_range(2)` for the general description of the related system call.
1073  *
1074  * Note: available from Linux 5.2
1075  */
1076 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len,
1077     SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe
1078 {
1079     entry.prepRW(Operation.SYNC_FILE_RANGE, fd, null, len, offset);
1080     entry.sync_range_flags = flags;
1081     return entry;
1082 }
1083 
1084 /**
1085  * This command will register a timeout operation.
1086  *
1087  * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A
1088  * timeout condition is met when either the specified timeout expires, or the specified number of
1089  * events have completed. Either condition will trigger the event. The request will complete with
1090  * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got
1091  * completed through requests completing on their own. If the timeout was cancelled before it
1092  * expired, the request will complete with `-ECANCELED`.
1093  *
1094  * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation.
1095  *
1096  * Params:
1097  *      entry = `SubmissionEntry` to prepare
1098  *      time = reference to `time64` data structure
1099  *      count = completion event count
1100  *      flags = define if it's a relative or absolute time
1101  *
1102  * Note: Available from Linux 5.4
1103  */
1104 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time,
1105     ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted
1106 {
1107     entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count);
1108     entry.timeout_flags = flags;
1109     return entry;
1110 }
1111 
1112 /**
1113  * Prepares operations to remove existing timeout registered using `TIMEOUT`operation.
1114  *
1115  * Attempt to remove an existing timeout operation. If the specified timeout request is found and
1116  * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the
1117  * timeout request was found but expiration was already in progress, this request will terminate
1118  * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will
1119  * terminate with a result value of `-ENOENT`.
1120  *
1121  * Params:
1122  *      entry = `SubmissionEntry` to prepare
1123  *      userData = user data provided with the previously issued timeout operation
1124  *
1125  * Note: Available from Linux 5.5
1126  */
1127 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted
1128 {
1129     return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData);
1130 }
1131 
1132 /**
1133  * Prepares operations to update existing timeout registered using `TIMEOUT`operation.
1134  *
1135  * Params:
1136  *      entry = `SubmissionEntry` to prepare
1137  *      userData = user data provided with the previously issued timeout operation
1138  *      time = reference to `time64` data structure with a new time spec
1139  *      flags = define if it's a relative or absolute time
1140  *
1141  * Note: Available from Linux 5.11
1142  */
1143 ref SubmissionEntry prepTimeoutUpdate(D)(return ref SubmissionEntry entry,
1144     ref KernelTimespec time, ref D userData, TimeoutFlags flags) @trusted
1145 {
1146     entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData, 0, cast(ulong)(cast(void*)&time));
1147     entry.timeout_flags = flags | TimeoutFlags.UPDATE;
1148     return entry;
1149 }
1150 
1151 /**
1152  * Prepares `accept4(2)` operation.
1153  *
1154  * See_Also: `accept4(2)`` for the general description of the related system call.
1155  *
1156  * Params:
1157  *      entry = `SubmissionEntry` to prepare
1158  *      fd = socket file descriptor
1159  *      addr = reference to one of sockaddr structires to be filled with accepted client address
1160  *      addrlen = reference to addrlen field that would be filled with accepted client address length
1161  *
1162  * Note: Available from Linux 5.5
1163  */
1164 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
1165     AcceptFlags flags = AcceptFlags.NONE) @trusted
1166 {
1167     entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen));
1168     entry.accept_flags = flags;
1169     return entry;
1170 }
1171 
1172 /**
1173  * Same as `prepAccept`, but fd is put directly into fixed file table on `fileIndex`.
1174  * Note: available from Linux 5.15
1175  */
1176 ref SubmissionEntry prepAcceptDirect(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
1177     uint fileIndex, AcceptFlags flags = AcceptFlags.NONE) @trusted
1178 {
1179     entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen));
1180     entry.accept_flags = flags;
1181     entry.file_index = fileIndex+1;
1182     return entry;
1183 }
1184 
1185 /**
1186  * Prepares operation that cancels existing async work.
1187  *
1188  * This works with any read/write request, accept,send/recvmsg, etc. There’s an important
1189  * distinction to make here with the different kinds of commands. A read/write on a regular file
1190  * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore
1191  * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel
1192  * these operations if they haven’t yet been started. If they have been started, cancellations on
1193  * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled
1194  * at any time. The completion event for this request will have a result of 0 if done successfully,
1195  * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request
1196  * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may
1197  * not cause this request to be stopped sooner. For blocking IO, the original request will complete
1198  * as it originally would have. For IO that is cancellable, it will terminate sooner if at all
1199  * possible.
1200  *
1201  * Params:
1202  *      entry = `SubmissionEntry` to prepare
1203  *      userData = `user_data` field of the request that should be cancelled
1204  *
1205  * Note: Available from Linux 5.5
1206  */
1207 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted
1208 {
1209     entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData);
1210     entry.cancel_flags = flags;
1211     return entry;
1212 }
1213 
1214 /**
1215  * Prepares linked timeout operation.
1216  *
1217  * This request must be linked with another request through `IOSQE_IO_LINK` which is described below.
1218  * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion
1219  * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no
1220  * completion event count as it's tied to a specific request. If used, the timeout specified in the
1221  * command will cancel the linked command, unless the linked command completes before the
1222  * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was
1223  * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked
1224  * request.
1225  *
1226  * Note: Available from Linux 5.5
1227  *
1228  * Params:
1229  *      entry = `SubmissionEntry` to prepare
1230  *      time = time specification
1231  *      flags = define if it's a relative or absolute time
1232  */
1233 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted
1234 {
1235     entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0);
1236     entry.timeout_flags = flags;
1237     return entry;
1238 }
1239 
1240 /**
1241  * Note: Available from Linux 5.5
1242  */
1243 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted
1244 {
1245     return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof);
1246 }
1247 
1248 /**
1249  * Note: Available from Linux 5.6
1250  */
1251 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe
1252 {
1253     return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset);
1254 }
1255 
1256 /**
1257  * Note: Available from Linux 5.6
1258  */
1259 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted
1260 {
1261     return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset);
1262 }
1263 
1264 /**
1265  * Note: Available from Linux 5.6
1266  */
1267 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode) @trusted
1268 {
1269     entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0);
1270     entry.open_flags = flags;
1271     return entry;
1272 }
1273 
1274 /**
1275  * Same as `prepOpenat`, but fd is put directly into fixed file table on `fileIndex`.
1276  * Note: available from Linux 5.15
1277  */
1278 ref SubmissionEntry prepOpenatDirect(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode, uint fileIndex) @trusted
1279 {
1280     entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0);
1281     entry.open_flags = flags;
1282     entry.file_index = fileIndex+1;
1283     return entry;
1284 }
1285 
1286 /**
1287  * Note: Available from Linux 5.6
1288  */
1289 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe
1290 {
1291     return entry.prepRW(Operation.CLOSE, fd);
1292 }
1293 
1294 /**
1295  * Same as `prepClose` but operation works directly with fd registered in fixed file table on index `fileIndex`.
1296  * Note: Available from Linux 5.15
1297  */
1298 ref SubmissionEntry prepCloseDirect(return ref SubmissionEntry entry, int fd, uint fileIndex) @safe
1299 {
1300     entry.prepRW(Operation.CLOSE, fd);
1301     entry.file_index = fileIndex+1;
1302     return entry;
1303 }
1304 
1305 /**
1306  * Note: Available from Linux 5.6
1307  */
1308 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe
1309 {
1310     return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
1311 }
1312 
1313 /**
1314  * Note: Available from Linux 5.6
1315  */
1316 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted
1317 {
1318     return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
1319 }
1320 
1321 /**
1322  * Note: Available from Linux 5.6
1323  */
1324 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mask, ref Statx statxbuf) @trusted
1325 {
1326     entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf));
1327     entry.statx_flags = flags;
1328     return entry;
1329 }
1330 
1331 /**
1332  * Note: Available from Linux 5.6
1333  */
1334 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe
1335 {
1336     entry.prepRW(Operation.FADVISE, fd, null, len, offset);
1337     entry.fadvise_advice = advice;
1338     return entry;
1339 }
1340 
1341 /**
1342  * Note: Available from Linux 5.6
1343  */
1344 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted
1345 {
1346     entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0);
1347     entry.fadvise_advice = advice;
1348     return entry;
1349 }
1350 
1351 /**
1352  * Note: Available from Linux 5.6
1353  */
1354 ref SubmissionEntry prepSend(return ref SubmissionEntry entry,
1355     int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted
1356 {
1357     entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
1358     entry.msg_flags = flags;
1359     return entry;
1360 }
1361 
1362 /**
1363  * Note: Available from Linux 5.6
1364  */
1365 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry,
1366     int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted
1367 {
1368     entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
1369     entry.msg_flags = flags;
1370     return entry;
1371 }
1372 
1373 /**
1374  * Variant that uses registered buffers group.
1375  *
1376  * Note: Available from Linux 5.6
1377  */
1378 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry,
1379     int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe
1380 {
1381     entry.prepRW(Operation.RECV, sockfd, null, len, 0);
1382     entry.msg_flags = flags;
1383     entry.buf_group = gid;
1384     entry.flags |= SubmissionEntryFlags.BUFFER_SELECT;
1385     return entry;
1386 }
1387 
1388 /**
1389  * Note: Available from Linux 5.6
1390  */
1391 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted
1392 {
1393     return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how));
1394 }
1395 
1396 /**
1397  * Same as `prepOpenat2`, but fd is put directly into fixed file table on `fileIndex`.
1398  * Note: available from Linux 5.15
1399  */
1400  ref SubmissionEntry prepOpenat2Direct(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how, uint fileIndex) @trusted
1401 {
1402     entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how));
1403     entry.file_index = fileIndex+1;
1404     return entry;
1405 }
1406 
1407 /**
1408  * Note: Available from Linux 5.6
1409  */
1410 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted
1411 {
1412     return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd);
1413 }
1414 
1415 /**
1416  * Note: Available from Linux 5.7
1417  *
1418  * This splice operation can be used to implement sendfile by splicing to an intermediate pipe
1419  * first, then splice to the final destination. In fact, the implementation of sendfile in kernel
1420  * uses splice internally.
1421  *
1422  * NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with
1423  * EINVAL if one of the fd doesn't explicitly support splice operation, e.g. reading from terminal
1424  * is unsupported from kernel 5.7 to 5.11. Check issue #291 for more information.
1425  *
1426  * Either fd_in or fd_out must be a pipe.
1427  *
1428  * Params
1429  *   fd_in = input file descriptor
1430  *   off_in = If fd_in refers to a pipe, off_in must be -1.
1431  *            If fd_in does not refer to a pipe and off_in is -1, then bytes are read from
1432  *            fd_in starting from the file offset and it is adjust appropriately;
1433  *            If fd_in does not refer to a pipe and off_in is not -1, then the starting
1434  *            offset of fd_in will be off_in.
1435  *   fd_out = output filedescriptor
1436  *   off_out = The description of off_in also applied to off_out.
1437  *   len = Up to len bytes would be transfered between file descriptors.
1438  *   splice_flags = see man splice(2) for description of flags.
1439  */
1440 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry,
1441     int fd_in, ulong off_in,
1442     int fd_out, ulong off_out,
1443     uint len, uint splice_flags) @safe
1444 {
1445     entry.prepRW(Operation.SPLICE, fd_out, null, len, off_out);
1446     entry.splice_off_in = off_in;
1447     entry.splice_fd_in = fd_in;
1448     entry.splice_flags = splice_flags;
1449     return entry;
1450 }
1451 
1452 /**
1453  * Note: Available from Linux 5.7
1454  *
1455  * Params:
1456  *    entry = `SubmissionEntry` to prepare
1457  *    buf   = buffers to provide
1458  *    len   = length of each buffer to add
1459  *    bgid  = buffers group id
1460  *    bid   = starting buffer id
1461  */
1462 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe
1463 {
1464     assert(buf.length <= int.max, "Too many buffers");
1465     assert(len <= uint.max, "Buffer too large");
1466     version (assert) {
1467         foreach (b; buf) assert(b.length <= len, "Invalid buffer length");
1468     }
1469     entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid);
1470     entry.buf_group = bgid;
1471     return entry;
1472 }
1473 
1474 /// ditto
1475 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe
1476 {
1477     static assert(N <= int.max, "Too many buffers");
1478     static assert(M <= uint.max, "Buffer too large");
1479     entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid);
1480     entry.buf_group = bgid;
1481     return entry;
1482 }
1483 
1484 /// ditto
1485 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe
1486 {
1487     static assert(N <= uint.max, "Buffer too large");
1488     entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid);
1489     entry.buf_group = bgid;
1490     return entry;
1491 }
1492 
1493 /// ditto
1494 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe
1495 {
1496     assert(buf.length <= uint.max, "Buffer too large");
1497     entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid);
1498     entry.buf_group = bgid;
1499     return entry;
1500 }
1501 
1502 /**
1503  * Note: Available from Linux 5.7
1504  */
1505 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe
1506 {
1507     entry.prepRW(Operation.REMOVE_BUFFERS, nr);
1508     entry.buf_group = bgid;
1509     return entry;
1510 }
1511 
1512 /**
1513  * Note: Available from Linux 5.8
1514  */
1515 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe
1516 {
1517     entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0);
1518     entry.splice_off_in = 0;
1519     entry.splice_fd_in = fd_in;
1520     entry.splice_flags = flags;
1521     return entry;
1522 }
1523 
1524 /**
1525  * Note: Available from Linux 5.11
1526  */
1527 ref SubmissionEntry prepShutdown(return ref SubmissionEntry entry, int fd, int how) @safe
1528 {
1529     return entry.prepRW(Operation.SHUTDOWN, fd, null, how, 0);
1530 }
1531 
1532 /**
1533  * Note: Available from Linux 5.11
1534  */
1535 ref SubmissionEntry prepRenameat(return ref SubmissionEntry entry,
1536     int olddfd, const(char)* oldpath, int newfd, const(char)* newpath, int flags) @trusted
1537 {
1538     entry.prepRW(Operation.RENAMEAT, olddfd, cast(void*)oldpath, newfd, cast(ulong)cast(void*)newpath);
1539     entry.rename_flags = flags;
1540     return entry;
1541 }
1542 
1543 /**
1544  * Note: Available from Linux 5.11
1545  */
1546 ref SubmissionEntry prepUnlinkat(return ref SubmissionEntry entry, int dirfd, const(char)* path, int flags) @trusted
1547 {
1548     entry.prepRW(Operation.UNLINKAT, dirfd, cast(void*)path, 0, 0);
1549     entry.unlink_flags = flags;
1550     return entry;
1551 }
1552 
1553 /**
1554  * Note: Available from Linux 5.15
1555  */
1556 ref SubmissionEntry prepMkdirat(return ref SubmissionEntry entry, int dirfd, const(char)* path, mode_t mode) @trusted
1557 {
1558     entry.prepRW(Operation.MKDIRAT, dirfd, cast(void*)path, mode, 0);
1559     return entry;
1560 }
1561 
1562 /**
1563  * Note: Available from Linux 5.15
1564  */
1565 ref SubmissionEntry prepSymlinkat(return ref SubmissionEntry entry, const(char)* target, int newdirfd, const(char)* linkpath) @trusted
1566 {
1567     entry.prepRW(Operation.SYMLINKAT, newdirfd, cast(void*)target, 0, cast(ulong)cast(void*)linkpath);
1568     return entry;
1569 }
1570 
1571 /**
1572  * Note: Available from Linux 5.15
1573  */
1574 ref SubmissionEntry prepLinkat(return ref SubmissionEntry entry,
1575     int olddirfd, const(char)* oldpath,
1576     int newdirfd, const(char)* newpath, int flags) @trusted
1577 {
1578     entry.prepRW(Operation.LINKAT, olddirfd, cast(void*)oldpath, newdirfd, cast(ulong)cast(void*)newpath);
1579     entry.hardlink_flags = flags;
1580     return entry;
1581 }
1582 
1583 private:
1584 
1585 // uring cleanup
1586 void dispose(ref Uring uring) @trusted
1587 {
1588     if (uring.payload is null) return;
1589     // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs);
1590     if (--uring.payload.refs == 0)
1591     {
1592         import std.traits : hasElaborateDestructor;
1593         // debug printf("uring(%d): free\n", uring.payload.fd);
1594         static if (hasElaborateDestructor!UringDesc)
1595             destroy(*uring.payload); // call possible destructors
1596         free(cast(void*)uring.payload);
1597     }
1598     uring.payload = null;
1599 }
1600 
1601 // system fields descriptor
1602 struct UringDesc
1603 {
1604     nothrow @nogc:
1605 
1606     int fd;
1607     size_t refs;
1608     SetupParameters params;
1609     SubmissionQueue sq;
1610     CompletionQueue cq;
1611 
1612     iovec[] regBuffers;
1613 
1614     ~this() @trusted
1615     {
1616         if (regBuffers) free(cast(void*)&regBuffers[0]);
1617         if (sq.ring) munmap(sq.ring, sq.ringSize);
1618         if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof);
1619         if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize);
1620         close(fd);
1621     }
1622 
1623     private auto mapRings() @trusted
1624     {
1625         sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof;
1626         cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof;
1627 
1628         if (params.features & SetupFeatures.SINGLE_MMAP)
1629         {
1630             if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize;
1631             cq.ringSize = sq.ringSize;
1632         }
1633 
1634         sq.ring = mmap(null, sq.ringSize,
1635             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1636             fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET
1637         );
1638 
1639         if (sq.ring == MAP_FAILED)
1640         {
1641             sq.ring = null;
1642             return -errno;
1643         }
1644 
1645         if (params.features & SetupFeatures.SINGLE_MMAP)
1646             cq.ring = sq.ring;
1647         else
1648         {
1649             cq.ring = mmap(null, cq.ringSize,
1650                 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1651                 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET
1652             );
1653 
1654             if (cq.ring == MAP_FAILED)
1655             {
1656                 cq.ring = null;
1657                 return -errno; // cleanup is done in struct destructors
1658             }
1659         }
1660 
1661         uint entries    = *cast(uint*)(sq.ring + params.sq_off.ring_entries);
1662         sq.khead        = cast(uint*)(sq.ring + params.sq_off.head);
1663         sq.ktail        = cast(uint*)(sq.ring + params.sq_off.tail);
1664         sq.localTail    = *sq.ktail;
1665         sq.ringMask     = *cast(uint*)(sq.ring + params.sq_off.ring_mask);
1666         sq.kflags       = cast(uint*)(sq.ring + params.sq_off.flags);
1667         sq.kdropped     = cast(uint*)(sq.ring + params.sq_off.dropped);
1668 
1669         // Indirection array of indexes to the sqes array (head and tail are pointing to this array).
1670         // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it.
1671         // That way, head and tail are actually indexes to our sqes array.
1672         foreach (i; 0..entries)
1673         {
1674             *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i;
1675         }
1676 
1677         auto psqes = mmap(
1678             null, entries * SubmissionEntry.sizeof,
1679             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1680             fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET
1681         );
1682 
1683         if (psqes == MAP_FAILED) return -errno;
1684         sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries];
1685 
1686         entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries);
1687         cq.khead        = cast(uint*)(cq.ring + params.cq_off.head);
1688         cq.localHead    = *cq.khead;
1689         cq.ktail        = cast(uint*)(cq.ring + params.cq_off.tail);
1690         cq.ringMask     = *cast(uint*)(cq.ring + params.cq_off.ring_mask);
1691         cq.koverflow    = cast(uint*)(cq.ring + params.cq_off.overflow);
1692         cq.cqes         = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries];
1693         cq.kflags       = cast(uint*)(cq.ring + params.cq_off.flags);
1694         return 0;
1695     }
1696 }
1697 
1698 /// Wraper for `SubmissionEntry` queue
1699 struct SubmissionQueue
1700 {
1701     nothrow @nogc:
1702 
1703     // mmaped fields
1704     uint* khead; // controlled by kernel
1705     uint* ktail; // controlled by us
1706     uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP)
1707     uint* kdropped; // counter of invalid submissions (out of bound index)
1708     uint ringMask; // constant mask used to determine array index from head/tail
1709 
1710     // mmap details (for cleanup)
1711     void* ring; // pointer to the mmaped region
1712     size_t ringSize; // size of mmaped memory block
1713 
1714     // mmapped list of entries (fixed length)
1715     SubmissionEntry[] sqes;
1716 
1717     uint localTail; // used for batch submission
1718 
1719     uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); }
1720     uint tail() const @safe pure { return localTail; }
1721 
1722     void flushTail() @safe pure
1723     {
1724         pragma(inline, true);
1725         // debug printf("SQ updating tail: %d\n", localTail);
1726         atomicStore!(MemoryOrder.rel)(*ktail, localTail);
1727     }
1728 
1729     SubmissionQueueFlags flags() const @safe pure
1730     {
1731         return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags);
1732     }
1733 
1734     bool full() const @safe pure { return sqes.length == length; }
1735 
1736     size_t length() const @safe pure { return tail - head; }
1737 
1738     size_t capacity() const @safe pure { return sqes.length - length; }
1739 
1740     ref SubmissionEntry next()() @safe pure return
1741     {
1742         assert(!full, "SumbissionQueue is full");
1743         return sqes[localTail++ & ringMask];
1744     }
1745 
1746     void put()(auto ref SubmissionEntry entry) @safe pure
1747     {
1748         assert(!full, "SumbissionQueue is full");
1749         sqes[localTail++ & ringMask] = entry;
1750     }
1751 
1752     void put(OP)(auto ref OP op)
1753         if (!is(OP == SubmissionEntry))
1754     {
1755         assert(!full, "SumbissionQueue is full");
1756         sqes[localTail++ & ringMask].fill(op);
1757     }
1758 
1759     private void putWith(alias FN, ARGS...)(auto ref ARGS args)
1760     {
1761         import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple;
1762 
1763         static assert(
1764             Parameters!FN.length >= 1
1765             && is(Parameters!FN[0] == SubmissionEntry)
1766             && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_,
1767             "Alias function must accept at least `ref SubmissionEntry`");
1768 
1769         static assert(
1770             is(typeof(FN(sqes[localTail & ringMask], args))),
1771             "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof);
1772 
1773         assert(!full, "SumbissionQueue is full");
1774         FN(sqes[localTail++ & ringMask], args);
1775     }
1776 
1777     uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); }
1778 }
1779 
1780 struct CompletionQueue
1781 {
1782     nothrow @nogc:
1783 
1784     // mmaped fields
1785     uint* khead; // controlled by us (increment after entry at head was read)
1786     uint* ktail; // updated by kernel
1787     uint* koverflow;
1788     uint* kflags;
1789     CompletionEntry[] cqes; // array of entries (fixed length)
1790 
1791     uint ringMask; // constant mask used to determine array index from head/tail
1792 
1793     // mmap details (for cleanup)
1794     void* ring;
1795     size_t ringSize;
1796 
1797     uint localHead; // used for bulk reading
1798 
1799     uint head() const @safe pure { return localHead; }
1800     uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); }
1801 
1802     void flushHead() @safe pure
1803     {
1804         pragma(inline, true);
1805         // debug printf("CQ updating head: %d\n", localHead);
1806         atomicStore!(MemoryOrder.rel)(*khead, localHead);
1807     }
1808 
1809     bool empty() const @safe pure { return head == tail; }
1810 
1811     ref CompletionEntry front() @safe pure return
1812     {
1813         assert(!empty, "CompletionQueue is empty");
1814         return cqes[localHead & ringMask];
1815     }
1816 
1817     void popFront() @safe pure
1818     {
1819         pragma(inline);
1820         assert(!empty, "CompletionQueue is empty");
1821         localHead++;
1822         flushHead();
1823     }
1824 
1825     size_t length() const @safe pure { return tail - localHead; }
1826 
1827     uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); }
1828 
1829     /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel.
1830     void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); }
1831 }
1832 
1833 // just a helper to use atomicStore more easily with older compilers
1834 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted
1835 {
1836     pragma(inline, true);
1837     import core.atomic : store = atomicStore;
1838     static if (__VERSION__ >= 2089) store!ms(val, newVal);
1839     else store!ms(*(cast(shared T*)&val), newVal);
1840 }
1841 
1842 // just a helper to use atomicLoad more easily with older compilers
1843 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted
1844 {
1845     pragma(inline, true);
1846     import core.atomic : load = atomicLoad;
1847     static if (__VERSION__ >= 2089) return load!ms(val);
1848     else return load!ms(*(cast(const shared T*)&val));
1849 }
1850 
1851 version (assert)
1852 {
1853     import std.range.primitives : ElementType, isInputRange, isOutputRange;
1854     static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry));
1855     static assert(isOutputRange!(Uring, SubmissionEntry));
1856 }
1857 
1858 version (LDC)
1859 {
1860     import ldc.intrinsics : llvm_expect;
1861     alias _expect = llvm_expect;
1862 }
1863 else
1864 {
1865     T _expect(T)(T val, T expected_val) if (__traits(isIntegral, T))
1866     {
1867         pragma(inline, true);
1868         return val;
1869     }
1870 }