1 /**
2  * Simple idiomatic dlang wrapper around linux io_uring
3  * (see: https://kernel.dk/io_uring.pdf) asynchronous API.
4  */
5 module during;
6 
7 version(linux) {}
8 else {
9     pragma(msg, "!!!!!!! during/io_uring is available ONLY on Linux systems (5.1+). This package is useless on your system. !!!!!!!!!");
10     static assert(0, "during is not available on your system");
11 }
12 
13 public import during.io_uring;
14 import during.openat2;
15 
16 import core.atomic : MemoryOrder;
17 debug import core.stdc.stdio;
18 import core.stdc.stdlib;
19 import core.sys.linux.epoll;
20 import core.sys.linux.errno;
21 import core.sys.linux.sched;
22 import core.sys.linux.sys.mman;
23 import core.sys.linux.unistd;
24 import core.sys.posix.signal;
25 import core.sys.posix.sys.socket;
26 import core.sys.posix.sys.types;
27 import core.sys.posix.sys.uio;
28 import std.algorithm.comparison : among;
29 
30 nothrow @nogc:
31 
32 /**
33  * Setup new instance of io_uring into provided `Uring` structure.
34  *
35  * Params:
36  *     uring = `Uring` structure to be initialized (must not be already initialized)
37  *     entries = Number of entries to initialize uring with
38  *     flags = `SetupFlags` to use to initialize uring.
39  *
40  * Returns: On succes it returns 0, `-errno` otherwise.
41  */
42 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE)
43 {
44     assert(uring.payload is null, "Uring is already initialized");
45     uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof);
46     if (uring.payload is null) return -errno;
47 
48     uring.payload.params.flags = flags;
49     uring.payload.refs = 1;
50     auto r = io_uring_setup(entries, uring.payload.params);
51     if (r < 0) return -errno;
52 
53     uring.payload.fd = r;
54 
55     r = uring.payload.mapRings();
56     if (r < 0)
57     {
58         dispose(uring);
59         return r;
60     }
61 
62     // debug printf("uring(%d): setup\n", uring.payload.fd);
63 
64     return 0;
65 }
66 
67 /**
68  * Main entry point to work with io_uring.
69  *
70  * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface.
71  * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries.
72  *
73  * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`.
74  *
75  * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for
76  * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry`
77  * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's
78  * reused for multiple operations.
79  */
80 struct Uring
81 {
82     nothrow @nogc:
83 
84     private UringDesc* payload;
85 
86     /// Copy constructor
87     this(ref return scope Uring rhs) @safe pure
88     {
89         assert(rhs.payload !is null, "rhs payload is null");
90         // debug printf("uring(%d): copy\n", rhs.payload.fd);
91         this.payload = rhs.payload;
92         this.payload.refs++;
93     }
94 
95     /// Destructor
96     ~this() @safe
97     {
98         dispose(this);
99     }
100 
101     /// Native io_uring file descriptor
102     int fd() const @safe pure
103     in (payload !is null, "Uring hasn't been initialized yet")
104     {
105         return payload.fd;
106     }
107 
108     /// io_uring parameters
109     SetupParameters params() const @safe pure return
110     in (payload !is null, "Uring hasn't been initialized yet")
111     {
112         return payload.params;
113     }
114 
115     /// Check if there is some `CompletionEntry` to process.
116     bool empty() const @safe pure
117     in (payload !is null, "Uring hasn't been initialized yet")
118     {
119         return payload.cq.empty;
120     }
121 
122     /// Check if there is space for another `SubmissionEntry` to submit.
123     bool full() const @safe pure
124     in (payload !is null, "Uring hasn't been initialized yet")
125     {
126         return payload.sq.full;
127     }
128 
129     /// Available space in submission queue before it becomes full
130     size_t capacity() const @safe pure
131     in (payload !is null, "Uring hasn't been initialized yet")
132     {
133         return payload.sq.capacity;
134     }
135 
136     /// Number of entries in completion queue
137     size_t length() const @safe pure
138     in (payload !is null, "Uring hasn't been initialized yet")
139     {
140         return payload.cq.length;
141     }
142 
143     /// Get first `CompletionEntry` from cq ring
144     ref CompletionEntry front() @safe pure return
145     in (payload !is null, "Uring hasn't been initialized yet")
146     {
147         return payload.cq.front;
148     }
149 
150     /// Move to next `CompletionEntry`
151     void popFront() @safe pure
152     in (payload !is null, "Uring hasn't been initialized yet")
153     {
154         return payload.cq.popFront;
155     }
156 
157     /**
158      * Adds new entry to the `SubmissionQueue`.
159      *
160      * Note that this just adds entry to the queue and doesn't advance the tail
161      * marker kernel sees. For that `finishSq()` is needed to be called next.
162      *
163      * Also note that to actually enter new entries to kernel,
164      * it's needed to call `submit()`.
165      *
166      * Params:
167      *     FN    = Function to fill next entry in queue by `ref` (should be faster).
168      *             It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`.
169      *             Note that in this case queue entry is cleaned first before function is called.
170      *     entry = Custom built `SubmissionEntry` to be posted as is.
171      *             Note that in this case it is copied whole over one in the `SubmissionQueue`.
172      *     args  = Optional arguments passed to the function
173      *
174      * Returns: reference to `Uring` structure so it's possible to chain multiple commands.
175      */
176     ref Uring put()(auto ref SubmissionEntry entry) @safe pure return
177     in (payload !is null, "Uring hasn't been initialized yet")
178     {
179         payload.sq.put(entry);
180         return this;
181     }
182 
183     /// ditto
184     ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return
185     in (payload !is null, "Uring hasn't been initialized yet")
186     {
187         import std.functional : forward;
188         payload.sq.putWith!FN(forward!args);
189         return this;
190     }
191 
192     /**
193      * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled
194      * to next `SubmissionEntry` in queue.
195      *
196      * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied.
197      *
198      * Params:
199      *   op = Custom operation definition.
200      * Returns:
201      */
202     ref Uring put(OP)(auto ref OP op) return
203         if (!is(OP == SubmissionEntry))
204     in (payload !is null, "Uring hasn't been initialized yet")
205     {
206         payload.sq.put(op);
207         return this;
208     }
209 
210     /**
211      * Advances the userspace submision queue and returns last `SubmissionEntry`.
212      */
213     ref SubmissionEntry next()() @safe pure
214     in (payload !is null, "Uring hasn't been initialized yet")
215     {
216         return payload.sq.next();
217     }
218 
219     /**
220      * If completion queue is full, the new event maybe dropped.
221      * This value records number of dropped events.
222      */
223     uint overflow() const @safe pure
224     in (payload !is null, "Uring hasn't been initialized yet")
225     {
226         return payload.cq.overflow;
227     }
228 
229     /// Counter of invalid submissions (out-of-bound index in submission array)
230     uint dropped() const @safe pure
231     in (payload !is null, "Uring hasn't been initialized yet")
232     {
233         return payload.sq.dropped;
234     }
235 
236     /**
237      * Submits qued `SubmissionEntry` to be processed by kernel.
238      *
239      * Params:
240      *     want  = number of `CompletionEntries` to wait for.
241      *             If 0, this just submits queued entries and returns.
242      *             If > 0, it blocks until at least wanted number of entries were completed.
243      *     sig   = See io_uring_enter(2) man page
244      *
245      * Returns: Number of submitted entries on success, `-errno` on error
246      */
247     int submit(S)(uint want, const S* args)
248         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
249     in (payload !is null, "Uring hasn't been initialized yet")
250     {
251         if (_expect(want > 0, false)) return submitAndWait(want, args);
252         return submit(args);
253     }
254 
255     /// ditto
256     int submit(uint want) @safe
257     {
258         pragma(inline, true)
259         if (_expect(want > 0, true)) return submitAndWait(want, cast(sigset_t*)null);
260         return submit(cast(sigset_t*)null);
261     }
262 
263     /// ditto
264     int submit(S)(const S* args) @trusted
265         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
266     in (payload !is null, "Uring hasn't been initialized yet")
267     {
268         immutable len = cast(int)payload.sq.length;
269         if (_expect(len > 0, true)) // anything to submit?
270         {
271             payload.sq.flushTail(); // advance queue index
272 
273             EnterFlags flags;
274             if (payload.params.flags & SetupFlags.SQPOLL)
275             {
276                 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false))
277                     flags |= EnterFlags.SQ_WAKEUP;
278                 else return len; // fast poll
279             }
280             immutable r = io_uring_enter(payload.fd, len, 0, flags, args);
281             if (_expect(r < 0, false)) return -errno;
282             return r;
283         }
284         return 0;
285     }
286 
287     /// ditto
288     int submit() @safe
289     {
290         pragma(inline, true)
291         return submit(cast(sigset_t*)null);
292     }
293 
294     /**
295      * Flushes submission queue index to the kernel.
296      * Doesn't call any syscall, it just advances the SQE queue for kernel.
297      * This can be used with `IORING_SETUP_SQPOLL` when kernel polls the submission queue.
298      */
299     void flush() @safe
300     {
301         payload.sq.flushTail(); // advance queue index
302     }
303 
304     /**
305      * Simmilar to `submit` but with this method we just wait for required number
306      * of `CompletionEntries`.
307      *
308      * Returns: `0` on success, `-errno` on error
309      */
310     int wait(S)(uint want = 1, const S* args = null) @trusted
311         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
312     in (payload !is null, "Uring hasn't been initialized yet")
313     in (want > 0, "Invalid want value")
314     {
315         pragma(inline);
316         if (payload.cq.length >= want) return 0; // we don't need to syscall
317         immutable r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, args);
318         if (_expect(r < 0, false)) return -errno;
319         return 0;
320     }
321 
322     /// ditto
323     int wait(uint want = 1)
324     {
325         pragma(inline, true)
326         return wait(want, cast(sigset_t*)null);
327     }
328 
329     /**
330      * Special case of a `submit` that can be used when we know beforehead that we want to wait for
331      * some amount of CQEs.
332      */
333     int submitAndWait(S)(uint want, const S* args) @trusted
334         if (is(S == sigset_t) || is(S == io_uring_getevents_arg))
335     in (payload !is null, "Uring hasn't been initialized yet")
336     in (want > 0, "Invalid want value")
337     {
338         immutable len = cast(int)payload.sq.length;
339         if (_expect(len > 0, true)) // anything to submit?
340         {
341             payload.sq.flushTail(); // advance queue index
342 
343             EnterFlags flags = EnterFlags.GETEVENTS;
344             if (payload.params.flags & SetupFlags.SQPOLL)
345             {
346                 if (_expect(payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP, false))
347                     flags |= EnterFlags.SQ_WAKEUP;
348             }
349             immutable r = io_uring_enter(payload.fd, len, want, flags, args);
350             if (_expect(r < 0, false)) return -errno;
351             return r;
352         }
353         return wait(want); // just simple wait
354     }
355 
356     /// ditto
357     int submitAndWait(uint want) @safe
358     {
359         pragma(inline, true)
360         return submitAndWait(want, cast(sigset_t*)null);
361     }
362 
363     /**
364      * Register single buffer to be mapped into the kernel for faster buffered operations.
365      *
366      * To use the buffers, the application must specify the fixed variants for of operations,
367      * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set
368      * in entry extra data.
369      *
370      * An application can increase or decrease the size or number of registered buffers by first
371      * unregistering the existing buffers, and then issuing a new call to io_uring_register() with
372      * the new buffers.
373      *
374      * Params:
375      *   buffer = Buffers to be registered
376      *
377      * Returns: On success, returns 0. On error, `-errno` is returned.
378      */
379     int registerBuffers(T)(T buffers)
380         if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else?
381     in (payload !is null, "Uring hasn't been initialized yet")
382     in (buffers.length, "Empty buffer")
383     {
384         if (payload.regBuffers !is null)
385             return -EBUSY; // buffers were already registered
386 
387         static if (is(T == ubyte[]))
388         {
389             auto p = malloc(iovec.sizeof);
390             if (_expect(p is null, false)) return -errno;
391             payload.regBuffers = (cast(iovec*)p)[0..1];
392             payload.regBuffers[0].iov_base = cast(void*)&buffers[0];
393             payload.regBuffers[0].iov_len = buffers.length;
394         }
395         else static if (is(T == ubyte[][]))
396         {
397             auto p = malloc(buffers.length * iovec.sizeof);
398             if (_expect(p is null, false)) return -errno;
399             payload.regBuffers = (cast(iovec*)p)[0..buffers.length];
400 
401             foreach (i, b; buffers)
402             {
403                 assert(b.length, "Empty buffer");
404                 payload.regBuffers[i].iov_base = cast(void*)&b[0];
405                 payload.regBuffers[i].iov_len = b.length;
406             }
407         }
408 
409         immutable r = io_uring_register(
410                 payload.fd,
411                 RegisterOpCode.REGISTER_BUFFERS,
412                 cast(const(void)*)&payload.regBuffers[0], 1
413             );
414 
415         if (_expect(r < 0, false)) return -errno;
416         return 0;
417     }
418 
419     /**
420      * Releases all previously registered buffers associated with the `io_uring` instance.
421      *
422      * An application need not unregister buffers explicitly before shutting down the io_uring instance.
423      *
424      * Returns: On success, returns 0. On error, `-errno` is returned.
425      */
426     int unregisterBuffers() @trusted
427     in (payload !is null, "Uring hasn't been initialized yet")
428     {
429         if (payload.regBuffers is null)
430             return -ENXIO; // no buffers were registered
431 
432         free(cast(void*)&payload.regBuffers[0]);
433         payload.regBuffers = null;
434 
435         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0);
436         if (_expect(r < 0, false)) return -errno;
437         return 0;
438     }
439 
440     /**
441      * Register files for I/O.
442      *
443      * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags
444      * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the
445      * file descriptor array.
446      *
447      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
448      * need only unregister if it wishes to register a new set of fds.
449      *
450      * Use `-1` as a file descriptor to mark it as reserved in the array.*
451      * Params: fds = array of file descriptors to be registered
452      *
453      * Returns: On success, returns 0. On error, `-errno` is returned.
454      */
455     int registerFiles(const(int)[] fds)
456     in (payload !is null, "Uring hasn't been initialized yet")
457     in (fds.length, "No file descriptors provided")
458     in (fds.length < uint.max, "Too many file descriptors")
459     {
460         // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers).
461         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length);
462         if (_expect(r < 0, false)) return -errno;
463         return 0;
464     }
465 
466     /*
467      * Register an update for an existing file set. The updates will start at
468      * `off` in the original array.
469      *
470      * Use `-1` as a file descriptor to mark it as reserved in the array.
471      *
472      * Params:
473      *      off = offset to the original registered files to be updated
474      *      files = array of file descriptors to update with
475      *
476      * Returns: number of files updated on success, -errno on failure.
477      */
478     int registerFilesUpdate(uint off, const(int)[] fds) @trusted
479     in (payload !is null, "Uring hasn't been initialized yet")
480     in (fds.length, "No file descriptors provided to update")
481     in (fds.length < uint.max, "Too many file descriptors")
482     {
483         struct Update // represents io_uring_files_update (obsolete) or io_uring_rsrc_update
484         {
485             uint offset;
486             uint _resv;
487             ulong data;
488         }
489 
490         static assert (Update.sizeof == 16);
491 
492         Update u = { offset: off, data: cast(ulong)&fds[0] };
493         immutable r = io_uring_register(
494             payload.fd,
495             RegisterOpCode.REGISTER_FILES_UPDATE,
496             &u, cast(uint)fds.length);
497         if (_expect(r < 0, false)) return -errno;
498         return 0;
499     }
500 
501     /**
502      * All previously registered files associated with the `io_uring` instance will be unregistered.
503      *
504      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
505      * need only unregister if it wishes to register a new set of fds.
506      *
507      * Returns: On success, returns 0. On error, `-errno` is returned.
508      */
509     int unregisterFiles() @trusted
510     in (payload !is null, "Uring hasn't been initialized yet")
511     {
512         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0);
513         if (_expect(r < 0, false)) return -errno;
514         return 0;
515     }
516 
517     /**
518      * Registers event file descriptor that would be used as a notification mechanism on completion
519      * queue change.
520      *
521      * Params: eventFD = event filedescriptor to be notified about change
522      *
523      * Returns: On success, returns 0. On error, `-errno` is returned.
524      */
525     int registerEventFD(int eventFD) @trusted
526     in (payload !is null, "Uring hasn't been initialized yet")
527     {
528         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1);
529         if (_expect(r < 0, false)) return -errno;
530         return 0;
531     }
532 
533     /**
534      * Unregister previously registered notification event file descriptor.
535      *
536      * Returns: On success, returns 0. On error, `-errno` is returned.
537      */
538     int unregisterEventFD() @trusted
539     in (payload !is null, "Uring hasn't been initialized yet")
540     {
541         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0);
542         if (_expect(r < 0, false)) return -errno;
543         return 0;
544     }
545 
546     /**
547      * Generic means to register resources.
548      *
549      * Note: Available from Linux 5.13
550      */
551     int registerRsrc(R)(RegisterOpCode type, const(R)[] data, const(ulong)[] tags)
552     in (payload !is null, "Uring hasn't been initialized yet")
553     in (data.length == tags.length, "Different array lengths")
554     in (data.length < uint.max, "Too many resources")
555     {
556         struct Register // represents io_uring_rsrc_register
557         {
558             uint nr;
559             uint _resv;
560             ulong _resv2;
561             ulong data;
562             ulong tags;
563         }
564 
565         static assert (Register.sizeof == 32);
566 
567         Register r = {
568             nr: cast(uint)data.length,
569             data: cast(ulong)&data[0],
570             tags: cast(ulong)&tags[0]
571         };
572         immutable ret = io_uring_register(
573             payload.fd,
574             type,
575             &r, sizeof(r));
576         if (_expect(ret < 0, false)) return -errno;
577         return 0;
578     }
579 
580     /**
581      * Generic means to update registered resources.
582      *
583      * Note: Available from Linux 5.13
584      */
585     int registerRsrcUpdate(R)(RegisterOpCode type, uint off, const(R)[] data, const(ulong)[] tags) @trusted
586     in (payload !is null, "Uring hasn't been initialized yet")
587     in (data.length == tags.length, "Different array lengths")
588     in (data.length < uint.max, "Too many file descriptors")
589     {
590         struct Update // represents io_uring_rsrc_update2
591         {
592             uint offset;
593             uint _resv;
594             ulong data;
595             ulong tags;
596             uint nr;
597             uint _resv2;
598         }
599 
600         static assert (Update.sizeof == 32);
601 
602         Update u = {
603             offset: off,
604             data: cast(ulong)&data[0],
605             tags: cast(ulong)&tags[0],
606             nr: cast(uint)data.length,
607         };
608         immutable r = io_uring_register(payload.fd, type, &u, sizeof(u));
609         if (_expect(r < 0, false)) return -errno;
610         return 0;
611     }
612 
613     /**
614      * Register a feature whitelist. Attempting to call any operations which are not whitelisted
615      * will result in an error.
616      *
617      * Note: Can only be called once to prevent other code from bypassing the whitelist.
618      *
619      * Params: res = the struct containing the restriction info.
620      *
621      * Returns: On success, returns 0. On error, `-errno` is returned.
622      *
623      * Note: Available from Linux 5.10
624      */
625     int registerRestrictions(scope ref io_uring_restriction res) @trusted
626     in (payload !is null, "Uring hasn't been initialized yet")
627     {
628         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_RESTRICTIONS, &res, 1);
629         if (_expect(r < 0, false)) return -errno;
630         return 0;
631     }
632 
633     /**
634      * Enable the "rings" of this Uring if they were previously disabled with
635      * `IORING_SETUP_R_DISABLED`.
636      *
637      * Returns: On success, returns 0. On error, `-errno` is returned.
638      *
639      * Note: Available from Linux 5.10
640      */
641     int enableRings() @trusted
642     in (payload !is null, "Uring hasn't been initialized yet")
643     {
644         immutable r = io_uring_register(payload.fd, RegisterOpCode.ENABLE_RINGS, null, 0);
645         if (_expect(r < 0, false)) return -errno;
646         return 0;
647     }
648 
649     /**
650      * By default, async workers created by io_uring will inherit the CPU mask of its parent. This
651      * is usually all the CPUs in the system, unless the parent is being run with a limited set. If
652      * this isn't the desired outcome, the application may explicitly tell io_uring what CPUs the
653      * async workers may run on.
654      *
655      * Note: Available since 5.14.
656      */
657     int registerIOWQAffinity(cpu_set_t[] cpus) @trusted
658     in (cpus.length)
659     {
660         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_AFF, cpus.ptr, cast(uint)cpus.length);
661         if (_expect(r < 0, false)) return -errno;
662         return 0;
663     }
664 
665     /**
666      * Undoes a CPU mask previously set with `registerIOWQAffinity`.
667      *
668      * Note: Available since 5.14
669      */
670     int unregisterIOWQAffinity() @trusted
671     {
672         immutable r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_IOWQ_AFF, null, 0);
673         if (_expect(r < 0, false)) return -errno;
674         return 0;
675     }
676 
677     /**
678      * By default, io_uring limits the unbounded workers created to the maximum processor count set
679      * by `RLIMIT_NPROC` and the bounded workers is a function of the SQ ring size and the number of
680      * CPUs in the system. Sometimes this can be excessive (or too little, for bounded), and this
681      * command provides a way to change the count per ring (per NUMA node) instead.
682      *
683      * `val` must be set to an `uint` pointer to an array of two values, with the values in the
684      * array being set to the maximum count of workers per NUMA node. Index 0 holds the bounded
685      * worker count, and index 1 holds the unbounded worker count. On successful return, the passed
686      * in array will contain the previous maximum values for each type. If the count being passed in
687      * is 0, then this command returns the current maximum values and doesn't modify the current
688      * setting.
689      *
690      * Note: Available since 5.15
691      */
692     int registerIOWQMaxWorkers(ref uint[2] workers) @trusted
693     {
694         immutable r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_IOWQ_MAX_WORKERS, &workers, 2);
695         if (_expect(r < 0, false)) return -errno;
696         return 0;
697     }
698 }
699 
700 /**
701  * Uses custom operation definition to fill fields of `SubmissionEntry`.
702  * Can be used in cases, when builtin prep* functions aren't enough.
703  *
704  * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work.
705  *
706  * Note: This doesn't touch previous state of the entry, just fills the corresponding fields.
707  *       So it might be needed to call `clear` first on the entry (depends on usage).
708  *
709  * Params:
710  *   entry = entry to set parameters to
711  *   op = operation to fill entry with (can be custom type)
712  */
713 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op)
714 {
715     pragma(inline);
716     import std.traits : hasMember, FieldNameTuple;
717 
718     // fill entry from provided operation fields (they must have same name as in SubmissionEntry)
719     foreach (m; FieldNameTuple!E)
720     {
721         static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m);
722         __traits(getMember, entry, m) = __traits(getMember, op, m);
723     }
724 
725     return entry;
726 }
727 
728 /**
729  * Template function to help set `SubmissionEntry` `user_data` field.
730  *
731  * Params:
732  *      entry = `SubmissionEntry` to prepare
733  *      data = data to set to the `SubmissionEntry`
734  *
735  * Note: data are passed by ref and must live during whole operation.
736  */
737 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted
738 {
739     pragma(inline);
740     entry.user_data = cast(ulong)(cast(void*)&data);
741     return entry;
742 }
743 
744 /**
745  * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData`
746  * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to
747  * the data.
748  *
749  * Because of that, data must be of `ulong.sizeof`.
750  */
751 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted
752     if (D.sizeof == ulong.sizeof)
753 {
754     pragma(inline);
755     entry.user_data = *(cast(ulong*)(cast(void*)&data));
756     return entry;
757 }
758 
759 /**
760  * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`).
761  */
762 D userDataAs(D)(ref CompletionEntry entry) @trusted
763     if (D.sizeof == ulong.sizeof)
764 {
765     pragma(inline);
766     return *(cast(D*)(cast(void*)&entry.user_data));
767 }
768 
769 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op,
770     int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe
771 {
772     pragma(inline);
773     entry.opcode = op;
774     entry.fd = fd;
775     entry.off = offset;
776     entry.flags = SubmissionEntryFlags.NONE;
777     entry.ioprio = 0;
778     entry.addr = cast(ulong)addr;
779     entry.len = len;
780     entry.rw_flags = ReadWriteFlags.NONE;
781 	entry.user_data = 0;
782     entry.buf_index = 0;
783     entry.personality = 0;
784     entry.file_index = 0;
785     entry.__pad2[0] = entry.__pad2[1] = 0;
786     return entry;
787 }
788 
789 /**
790  * Prepares `nop` operation.
791  *
792  * Params:
793  *      entry = `SubmissionEntry` to prepare
794  */
795 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe
796 {
797     entry.prepRW(Operation.NOP);
798     return entry;
799 }
800 
801 /**
802  * Prepares `readv` operation.
803  *
804  * Params:
805  *      entry = `SubmissionEntry` to prepare
806  *      fd = file descriptor of file we are operating on
807  *      offset = offset
808  *      buffer = iovec buffers to be used by the operation
809  */
810 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted
811     if (is(V == iovec[]) || is(V == iovec))
812 {
813     static if (is(V == iovec[]))
814     {
815         assert(buffer.length, "Empty buffer");
816         assert(buffer.length < uint.max, "Too many iovec buffers");
817         return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
818     }
819     else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset);
820 }
821 
822 /**
823  * Prepares `writev` operation.
824  *
825  * Params:
826  *      entry = `SubmissionEntry` to prepare
827  *      fd = file descriptor of file we are operating on
828  *      offset = offset
829  *      buffer = iovec buffers to be used by the operation
830  */
831 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted
832     if (is(V == iovec[]) || is(V == iovec))
833 {
834     static if (is(V == iovec[]))
835     {
836         assert(buffer.length, "Empty buffer");
837         assert(buffer.length < uint.max, "Too many iovec buffers");
838         return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
839     }
840     else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset);
841 }
842 
843 /**
844  * Prepares `read_fixed` operation.
845  *
846  * Params:
847  *      entry = `SubmissionEntry` to prepare
848  *      fd = file descriptor of file we are operating on
849  *      offset = offset
850  *      buffer = slice to preregistered buffer
851  *      bufferIndex = index to the preregistered buffers array buffer belongs to
852  */
853 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe
854 {
855     assert(buffer.length, "Empty buffer");
856     assert(buffer.length < uint.max, "Buffer too large");
857     entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
858     entry.buf_index = bufferIndex;
859     return entry;
860 }
861 
862 /**
863  * Prepares `write_fixed` operation.
864  *
865  * Params:
866  *      entry = `SubmissionEntry` to prepare
867  *      fd = file descriptor of file we are operating on
868  *      offset = offset
869  *      buffer = slice to preregistered buffer
870  *      bufferIndex = index to the preregistered buffers array buffer belongs to
871  */
872 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe
873 {
874     assert(buffer.length, "Empty buffer");
875     assert(buffer.length < uint.max, "Buffer too large");
876     entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
877     entry.buf_index = bufferIndex;
878     return entry;
879 }
880 
881 /**
882  * Prepares `recvmsg(2)` operation.
883  *
884  * Params:
885  *      entry = `SubmissionEntry` to prepare
886  *      fd = file descriptor of file we are operating on
887  *      msg = message to operate with
888  *      flags = `recvmsg` operation flags
889  *
890  * Note: Available from Linux 5.3
891  *
892  * See_Also: `recvmsg(2)` man page for details.
893  */
894 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
895 {
896     entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0);
897     entry.msg_flags = flags;
898     return entry;
899 }
900 
901 /**
902  * Prepares `sendmsg(2)` operation.
903  *
904  * Params:
905  *      entry = `SubmissionEntry` to prepare
906  *      fd = file descriptor of file we are operating on
907  *      msg = message to operate with
908  *      flags = `sendmsg` operation flags
909  *
910  * Note: Available from Linux 5.3
911  *
912  * See_Also: `sendmsg(2)` man page for details.
913  */
914 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
915 {
916     entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0);
917     entry.msg_flags = flags;
918     return entry;
919 }
920 
921 /**
922  * Prepares `fsync` operation.
923  *
924  * Params:
925  *      entry = `SubmissionEntry` to prepare
926  *      fd = file descriptor of a file to call `fsync` on
927  *      flags = `fsync` operation flags
928  */
929 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe
930 {
931     entry.prepRW(Operation.FSYNC, fd);
932     entry.fsync_flags = flags;
933     return entry;
934 }
935 
936 /**
937  * Poll the fd specified in the submission queue entry for the events specified in the poll_events
938  * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode.
939  * That is, once the poll operation is completed, it will have to be resubmitted.
940  *
941  * Params:
942  *      entry = `SubmissionEntry` to prepare
943  *      fd = file descriptor to poll
944  *      events = events to poll on the FD
945  *      flags = poll operation flags
946  */
947 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry,
948     int fd, PollEvents events, PollFlags flags = PollFlags.NONE) @safe
949 {
950     import std.system : endian, Endian;
951 
952     entry.prepRW(Operation.POLL_ADD, fd, null, flags);
953     static if (endian == Endian.bigEndian)
954         entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16;
955     else
956         entry.poll_events32 = events;
957     return entry;
958 }
959 
960 /**
961  * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain
962  * `0`.  If not found, res will contain `-ENOENT`.
963  *
964  * Params:
965  *      entry = `SubmissionEntry` to prepare
966  *      userData = data with the previously issued poll operation
967  */
968 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted
969 {
970     return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData);
971 }
972 
973 /**
974  * Allow events and user_data update of running poll requests.
975  *
976  * Note: available from Linux 5.13
977  */
978 ref SubmissionEntry prepPollUpdate(U, V)(return ref SubmissionEntry entry,
979     ref U oldUserData, ref V newUserData, PollEvents events = PollEvents.NONE) @trusted
980 {
981     import std.system : endian, Endian;
982 
983     PollFlags flags;
984     if (events != PollEvents.NONE) flags |= PollFlags.UPDATE_EVENTS;
985     if (cast(void*)&oldUserData !is cast(void*)&newUserData) flags |= PollFlags.UPDATE_USER_DATA;
986 
987     entry.prepRW(
988         Operation.POLL_REMOVE,
989         -1,
990         cast(void*)&oldUserData,
991         flags,
992         cast(ulong)cast(void*)&newUserData
993     );
994     static if (endian == Endian.bigEndian)
995         entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16;
996     else
997         entry.poll_events32 = events;
998     return entry;
999 }
1000 
1001 /**
1002  * Prepares `sync_file_range(2)` operation.
1003  *
1004  * Sync a file segment with disk, permits fine control when synchronizing the open file referred to
1005  * by the file descriptor fd with disk.
1006  *
1007  * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized.
1008  *
1009  * Params:
1010  *      entry = `SubmissionEntry` to prepare
1011  *      fd = is the file descriptor to sync
1012  *      offset = the starting byte of the file range to be synchronized
1013  *      len = the length of the range to be synchronized, in bytes
1014  *      flags = the flags for the command.
1015  *
1016  * See_Also: `sync_file_range(2)` for the general description of the related system call.
1017  *
1018  * Note: available from Linux 5.2
1019  */
1020 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len,
1021     SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe
1022 {
1023     entry.prepRW(Operation.SYNC_FILE_RANGE, fd, null, len, offset);
1024     entry.sync_range_flags = flags;
1025     return entry;
1026 }
1027 
1028 /**
1029  * This command will register a timeout operation.
1030  *
1031  * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A
1032  * timeout condition is met when either the specified timeout expires, or the specified number of
1033  * events have completed. Either condition will trigger the event. The request will complete with
1034  * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got
1035  * completed through requests completing on their own. If the timeout was cancelled before it
1036  * expired, the request will complete with `-ECANCELED`.
1037  *
1038  * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation.
1039  *
1040  * Params:
1041  *      entry = `SubmissionEntry` to prepare
1042  *      time = reference to `time64` data structure
1043  *      count = completion event count
1044  *      flags = define if it's a relative or absolute time
1045  *
1046  * Note: Available from Linux 5.4
1047  */
1048 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time,
1049     ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted
1050 {
1051     entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count);
1052     entry.timeout_flags = flags;
1053     return entry;
1054 }
1055 
1056 /**
1057  * Prepares operations to remove existing timeout registered using `TIMEOUT`operation.
1058  *
1059  * Attempt to remove an existing timeout operation. If the specified timeout request is found and
1060  * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the
1061  * timeout request was found but expiration was already in progress, this request will terminate
1062  * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will
1063  * terminate with a result value of `-ENOENT`.
1064  *
1065  * Params:
1066  *      entry = `SubmissionEntry` to prepare
1067  *      userData = user data provided with the previously issued timeout operation
1068  *
1069  * Note: Available from Linux 5.5
1070  */
1071 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted
1072 {
1073     return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData);
1074 }
1075 
1076 /**
1077  * Prepares operations to update existing timeout registered using `TIMEOUT`operation.
1078  *
1079  * Params:
1080  *      entry = `SubmissionEntry` to prepare
1081  *      userData = user data provided with the previously issued timeout operation
1082  *      time = reference to `time64` data structure with a new time spec
1083  *      flags = define if it's a relative or absolute time
1084  *
1085  * Note: Available from Linux 5.11
1086  */
1087 ref SubmissionEntry prepTimeoutUpdate(D)(return ref SubmissionEntry entry,
1088     ref KernelTimespec time, ref D userData, TimeoutFlags flags) @trusted
1089 {
1090     entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData, 0, cast(ulong)(cast(void*)&time));
1091     entry.timeout_flags = flags | TimeoutFlags.UPDATE;
1092     return entry;
1093 }
1094 
1095 /**
1096  * Prepares `accept4(2)` operation.
1097  *
1098  * See_Also: `accept4(2)`` for the general description of the related system call.
1099  *
1100  * Params:
1101  *      entry = `SubmissionEntry` to prepare
1102  *      fd = socket file descriptor
1103  *      addr = reference to one of sockaddr structires to be filled with accepted client address
1104  *      addrlen = reference to addrlen field that would be filled with accepted client address length
1105  *
1106  * Note: Available from Linux 5.5
1107  */
1108 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
1109     AcceptFlags flags = AcceptFlags.NONE) @trusted
1110 {
1111     entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen));
1112     entry.accept_flags = flags;
1113     return entry;
1114 }
1115 
1116 /**
1117  * Same as `prepAccept`, but fd is put directly into fixed file table on `fileIndex`.
1118  * Note: available from Linux 5.15
1119  */
1120 ref SubmissionEntry prepAcceptDirect(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
1121     uint fileIndex, AcceptFlags flags = AcceptFlags.NONE) @trusted
1122 {
1123     entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen));
1124     entry.accept_flags = flags;
1125     entry.file_index = fileIndex+1;
1126     return entry;
1127 }
1128 
1129 /**
1130  * Prepares operation that cancels existing async work.
1131  *
1132  * This works with any read/write request, accept,send/recvmsg, etc. There’s an important
1133  * distinction to make here with the different kinds of commands. A read/write on a regular file
1134  * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore
1135  * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel
1136  * these operations if they haven’t yet been started. If they have been started, cancellations on
1137  * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled
1138  * at any time. The completion event for this request will have a result of 0 if done successfully,
1139  * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request
1140  * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may
1141  * not cause this request to be stopped sooner. For blocking IO, the original request will complete
1142  * as it originally would have. For IO that is cancellable, it will terminate sooner if at all
1143  * possible.
1144  *
1145  * Params:
1146  *      entry = `SubmissionEntry` to prepare
1147  *      userData = `user_data` field of the request that should be cancelled
1148  *
1149  * Note: Available from Linux 5.5
1150  */
1151 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted
1152 {
1153     entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData);
1154     entry.cancel_flags = flags;
1155     return entry;
1156 }
1157 
1158 /**
1159  * Prepares linked timeout operation.
1160  *
1161  * This request must be linked with another request through `IOSQE_IO_LINK` which is described below.
1162  * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion
1163  * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no
1164  * completion event count as it's tied to a specific request. If used, the timeout specified in the
1165  * command will cancel the linked command, unless the linked command completes before the
1166  * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was
1167  * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked
1168  * request.
1169  *
1170  * Note: Available from Linux 5.5
1171  *
1172  * Params:
1173  *      entry = `SubmissionEntry` to prepare
1174  *      time = time specification
1175  *      flags = define if it's a relative or absolute time
1176  */
1177 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted
1178 {
1179     entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0);
1180     entry.timeout_flags = flags;
1181     return entry;
1182 }
1183 
1184 /**
1185  * Note: Available from Linux 5.5
1186  */
1187 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted
1188 {
1189     return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof);
1190 }
1191 
1192 /**
1193  * Note: Available from Linux 5.6
1194  */
1195 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe
1196 {
1197     return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset);
1198 }
1199 
1200 /**
1201  * Note: Available from Linux 5.6
1202  */
1203 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted
1204 {
1205     return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset);
1206 }
1207 
1208 /**
1209  * Note: Available from Linux 5.6
1210  */
1211 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode) @trusted
1212 {
1213     entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0);
1214     entry.open_flags = flags;
1215     return entry;
1216 }
1217 
1218 /**
1219  * Same as `prepOpenat`, but fd is put directly into fixed file table on `fileIndex`.
1220  * Note: available from Linux 5.15
1221  */
1222 ref SubmissionEntry prepOpenatDirect(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mode, uint fileIndex) @trusted
1223 {
1224     entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0);
1225     entry.open_flags = flags;
1226     entry.file_index = fileIndex+1;
1227     return entry;
1228 }
1229 
1230 /**
1231  * Note: Available from Linux 5.6
1232  */
1233 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe
1234 {
1235     return entry.prepRW(Operation.CLOSE, fd);
1236 }
1237 
1238 /**
1239  * Same as `prepClose` but operation works directly with fd registered in fixed file table on index `fileIndex`.
1240  * Note: Available from Linux 5.15
1241  */
1242 ref SubmissionEntry prepCloseDirect(return ref SubmissionEntry entry, int fd, uint fileIndex) @safe
1243 {
1244     entry.prepRW(Operation.CLOSE, fd);
1245     entry.file_index = fileIndex+1;
1246     return entry;
1247 }
1248 
1249 /**
1250  * Note: Available from Linux 5.6
1251  */
1252 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe
1253 {
1254     return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
1255 }
1256 
1257 /**
1258  * Note: Available from Linux 5.6
1259  */
1260 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted
1261 {
1262     return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
1263 }
1264 
1265 /**
1266  * Note: Available from Linux 5.6
1267  */
1268 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const(char)* path, int flags, uint mask, ref Statx statxbuf) @trusted
1269 {
1270     entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf));
1271     entry.statx_flags = flags;
1272     return entry;
1273 }
1274 
1275 /**
1276  * Note: Available from Linux 5.6
1277  */
1278 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe
1279 {
1280     entry.prepRW(Operation.FADVISE, fd, null, len, offset);
1281     entry.fadvise_advice = advice;
1282     return entry;
1283 }
1284 
1285 /**
1286  * Note: Available from Linux 5.6
1287  */
1288 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted
1289 {
1290     entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0);
1291     entry.fadvise_advice = advice;
1292     return entry;
1293 }
1294 
1295 /**
1296  * Note: Available from Linux 5.6
1297  */
1298 ref SubmissionEntry prepSend(return ref SubmissionEntry entry,
1299     int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted
1300 {
1301     entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
1302     entry.msg_flags = flags;
1303     return entry;
1304 }
1305 
1306 /**
1307  * Note: Available from Linux 5.6
1308  */
1309 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry,
1310     int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted
1311 {
1312     entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
1313     entry.msg_flags = flags;
1314     return entry;
1315 }
1316 
1317 /**
1318  * Variant that uses registered buffers group.
1319  *
1320  * Note: Available from Linux 5.6
1321  */
1322 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry,
1323     int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe
1324 {
1325     entry.prepRW(Operation.RECV, sockfd, null, len, 0);
1326     entry.msg_flags = flags;
1327     entry.buf_group = gid;
1328     entry.flags |= SubmissionEntryFlags.BUFFER_SELECT;
1329     return entry;
1330 }
1331 
1332 /**
1333  * Note: Available from Linux 5.6
1334  */
1335 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted
1336 {
1337     return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how));
1338 }
1339 
1340 /**
1341  * Same as `prepOpenat2`, but fd is put directly into fixed file table on `fileIndex`.
1342  * Note: available from Linux 5.15
1343  */
1344  ref SubmissionEntry prepOpenat2Direct(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how, uint fileIndex) @trusted
1345 {
1346     entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how));
1347     entry.file_index = fileIndex+1;
1348     return entry;
1349 }
1350 
1351 /**
1352  * Note: Available from Linux 5.6
1353  */
1354 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted
1355 {
1356     return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd);
1357 }
1358 
1359 /**
1360  * Note: Available from Linux 5.7
1361  *
1362  * This splice operation can be used to implement sendfile by splicing to an intermediate pipe
1363  * first, then splice to the final destination. In fact, the implementation of sendfile in kernel
1364  * uses splice internally.
1365  *
1366  * NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with
1367  * EINVAL if one of the fd doesn't explicitly support splice operation, e.g. reading from terminal
1368  * is unsupported from kernel 5.7 to 5.11. Check issue #291 for more information.
1369  *
1370  * Either fd_in or fd_out must be a pipe.
1371  *
1372  * Params
1373  *   fd_in = input file descriptor
1374  *   off_in = If fd_in refers to a pipe, off_in must be -1.
1375  *            If fd_in does not refer to a pipe and off_in is -1, then bytes are read from
1376  *            fd_in starting from the file offset and it is adjust appropriately;
1377  *            If fd_in does not refer to a pipe and off_in is not -1, then the starting
1378  *            offset of fd_in will be off_in.
1379  *   fd_out = output filedescriptor
1380  *   off_out = The description of off_in also applied to off_out.
1381  *   len = Up to len bytes would be transfered between file descriptors.
1382  *   splice_flags = see man splice(2) for description of flags.
1383  */
1384 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry,
1385     int fd_in, ulong off_in,
1386     int fd_out, ulong off_out,
1387     uint len, uint splice_flags) @safe
1388 {
1389     entry.prepRW(Operation.SPLICE, fd_out, null, len, off_out);
1390     entry.splice_off_in = off_in;
1391     entry.splice_fd_in = fd_in;
1392     entry.splice_flags = splice_flags;
1393     return entry;
1394 }
1395 
1396 /**
1397  * Note: Available from Linux 5.7
1398  *
1399  * Params:
1400  *    entry = `SubmissionEntry` to prepare
1401  *    buf   = buffers to provide
1402  *    len   = length of each buffer to add
1403  *    bgid  = buffers group id
1404  *    bid   = starting buffer id
1405  */
1406 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe
1407 {
1408     assert(buf.length <= int.max, "Too many buffers");
1409     assert(len <= uint.max, "Buffer too large");
1410     version (assert) {
1411         foreach (b; buf) assert(b.length <= len, "Invalid buffer length");
1412     }
1413     entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid);
1414     entry.buf_group = bgid;
1415     return entry;
1416 }
1417 
1418 /// ditto
1419 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe
1420 {
1421     static assert(N <= int.max, "Too many buffers");
1422     static assert(M <= uint.max, "Buffer too large");
1423     entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid);
1424     entry.buf_group = bgid;
1425     return entry;
1426 }
1427 
1428 /// ditto
1429 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe
1430 {
1431     static assert(N <= uint.max, "Buffer too large");
1432     entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid);
1433     entry.buf_group = bgid;
1434     return entry;
1435 }
1436 
1437 /// ditto
1438 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe
1439 {
1440     assert(buf.length <= uint.max, "Buffer too large");
1441     entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid);
1442     entry.buf_group = bgid;
1443     return entry;
1444 }
1445 
1446 /**
1447  * Note: Available from Linux 5.7
1448  */
1449 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe
1450 {
1451     entry.prepRW(Operation.REMOVE_BUFFERS, nr);
1452     entry.buf_group = bgid;
1453     return entry;
1454 }
1455 
1456 /**
1457  * Note: Available from Linux 5.8
1458  */
1459 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe
1460 {
1461     entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0);
1462     entry.splice_off_in = 0;
1463     entry.splice_fd_in = fd_in;
1464     entry.splice_flags = flags;
1465     return entry;
1466 }
1467 
1468 /**
1469  * Note: Available from Linux 5.11
1470  */
1471 ref SubmissionEntry prepShutdown(return ref SubmissionEntry entry, int fd, int how) @safe
1472 {
1473     return entry.prepRW(Operation.SHUTDOWN, fd, null, how, 0);
1474 }
1475 
1476 /**
1477  * Note: Available from Linux 5.11
1478  */
1479 ref SubmissionEntry prepRenameat(return ref SubmissionEntry entry,
1480     int olddfd, const(char)* oldpath, int newfd, const(char)* newpath, int flags) @trusted
1481 {
1482     entry.prepRW(Operation.RENAMEAT, olddfd, cast(void*)oldpath, newfd, cast(ulong)cast(void*)newpath);
1483     entry.rename_flags = flags;
1484     return entry;
1485 }
1486 
1487 /**
1488  * Note: Available from Linux 5.11
1489  */
1490 ref SubmissionEntry prepUnlinkat(return ref SubmissionEntry entry, int dirfd, const(char)* path, int flags) @trusted
1491 {
1492     entry.prepRW(Operation.UNLINKAT, dirfd, cast(void*)path, 0, 0);
1493     entry.unlink_flags = flags;
1494     return entry;
1495 }
1496 
1497 /**
1498  * Note: Available from Linux 5.15
1499  */
1500 ref SubmissionEntry prepMkdirat(return ref SubmissionEntry entry, int dirfd, const(char)* path, mode_t mode) @trusted
1501 {
1502     entry.prepRW(Operation.MKDIRAT, dirfd, cast(void*)path, mode, 0);
1503     return entry;
1504 }
1505 
1506 /**
1507  * Note: Available from Linux 5.15
1508  */
1509 ref SubmissionEntry prepSymlinkat(return ref SubmissionEntry entry, const(char)* target, int newdirfd, const(char)* linkpath) @trusted
1510 {
1511     entry.prepRW(Operation.SYMLINKAT, newdirfd, cast(void*)target, 0, cast(ulong)cast(void*)linkpath);
1512     return entry;
1513 }
1514 
1515 /**
1516  * Note: Available from Linux 5.15
1517  */
1518 ref SubmissionEntry prepLinkat(return ref SubmissionEntry entry,
1519     int olddirfd, const(char)* oldpath,
1520     int newdirfd, const(char)* newpath, int flags) @trusted
1521 {
1522     entry.prepRW(Operation.LINKAT, olddirfd, cast(void*)oldpath, newdirfd, cast(ulong)cast(void*)newpath);
1523     entry.hardlink_flags = flags;
1524     return entry;
1525 }
1526 
1527 private:
1528 
1529 // uring cleanup
1530 void dispose(ref Uring uring) @trusted
1531 {
1532     if (uring.payload is null) return;
1533     // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs);
1534     if (--uring.payload.refs == 0)
1535     {
1536         import std.traits : hasElaborateDestructor;
1537         // debug printf("uring(%d): free\n", uring.payload.fd);
1538         static if (hasElaborateDestructor!UringDesc)
1539             destroy(*uring.payload); // call possible destructors
1540         free(cast(void*)uring.payload);
1541     }
1542     uring.payload = null;
1543 }
1544 
1545 // system fields descriptor
1546 struct UringDesc
1547 {
1548     nothrow @nogc:
1549 
1550     int fd;
1551     size_t refs;
1552     SetupParameters params;
1553     SubmissionQueue sq;
1554     CompletionQueue cq;
1555 
1556     iovec[] regBuffers;
1557 
1558     ~this() @trusted
1559     {
1560         if (regBuffers) free(cast(void*)&regBuffers[0]);
1561         if (sq.ring) munmap(sq.ring, sq.ringSize);
1562         if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof);
1563         if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize);
1564         close(fd);
1565     }
1566 
1567     private auto mapRings() @trusted
1568     {
1569         sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof;
1570         cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof;
1571 
1572         if (params.features & SetupFeatures.SINGLE_MMAP)
1573         {
1574             if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize;
1575             cq.ringSize = sq.ringSize;
1576         }
1577 
1578         sq.ring = mmap(null, sq.ringSize,
1579             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1580             fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET
1581         );
1582 
1583         if (sq.ring == MAP_FAILED)
1584         {
1585             sq.ring = null;
1586             return -errno;
1587         }
1588 
1589         if (params.features & SetupFeatures.SINGLE_MMAP)
1590             cq.ring = sq.ring;
1591         else
1592         {
1593             cq.ring = mmap(null, cq.ringSize,
1594                 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1595                 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET
1596             );
1597 
1598             if (cq.ring == MAP_FAILED)
1599             {
1600                 cq.ring = null;
1601                 return -errno; // cleanup is done in struct destructors
1602             }
1603         }
1604 
1605         uint entries    = *cast(uint*)(sq.ring + params.sq_off.ring_entries);
1606         sq.khead        = cast(uint*)(sq.ring + params.sq_off.head);
1607         sq.ktail        = cast(uint*)(sq.ring + params.sq_off.tail);
1608         sq.localTail    = *sq.ktail;
1609         sq.ringMask     = *cast(uint*)(sq.ring + params.sq_off.ring_mask);
1610         sq.kflags       = cast(uint*)(sq.ring + params.sq_off.flags);
1611         sq.kdropped     = cast(uint*)(sq.ring + params.sq_off.dropped);
1612 
1613         // Indirection array of indexes to the sqes array (head and tail are pointing to this array).
1614         // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it.
1615         // That way, head and tail are actually indexes to our sqes array.
1616         foreach (i; 0..entries)
1617         {
1618             *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i;
1619         }
1620 
1621         auto psqes = mmap(
1622             null, entries * SubmissionEntry.sizeof,
1623             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1624             fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET
1625         );
1626 
1627         if (psqes == MAP_FAILED) return -errno;
1628         sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries];
1629 
1630         entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries);
1631         cq.khead        = cast(uint*)(cq.ring + params.cq_off.head);
1632         cq.localHead    = *cq.khead;
1633         cq.ktail        = cast(uint*)(cq.ring + params.cq_off.tail);
1634         cq.ringMask     = *cast(uint*)(cq.ring + params.cq_off.ring_mask);
1635         cq.koverflow    = cast(uint*)(cq.ring + params.cq_off.overflow);
1636         cq.cqes         = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries];
1637         cq.kflags       = cast(uint*)(cq.ring + params.cq_off.flags);
1638         return 0;
1639     }
1640 }
1641 
1642 /// Wraper for `SubmissionEntry` queue
1643 struct SubmissionQueue
1644 {
1645     nothrow @nogc:
1646 
1647     // mmaped fields
1648     uint* khead; // controlled by kernel
1649     uint* ktail; // controlled by us
1650     uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP)
1651     uint* kdropped; // counter of invalid submissions (out of bound index)
1652     uint ringMask; // constant mask used to determine array index from head/tail
1653 
1654     // mmap details (for cleanup)
1655     void* ring; // pointer to the mmaped region
1656     size_t ringSize; // size of mmaped memory block
1657 
1658     // mmapped list of entries (fixed length)
1659     SubmissionEntry[] sqes;
1660 
1661     uint localTail; // used for batch submission
1662 
1663     uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); }
1664     uint tail() const @safe pure { return localTail; }
1665 
1666     void flushTail() @safe pure
1667     {
1668         pragma(inline, true);
1669         // debug printf("SQ updating tail: %d\n", localTail);
1670         atomicStore!(MemoryOrder.rel)(*ktail, localTail);
1671     }
1672 
1673     SubmissionQueueFlags flags() const @safe pure
1674     {
1675         return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags);
1676     }
1677 
1678     bool full() const @safe pure { return sqes.length == length; }
1679 
1680     size_t length() const @safe pure { return tail - head; }
1681 
1682     size_t capacity() const @safe pure { return sqes.length - length; }
1683 
1684     ref SubmissionEntry next()() @safe pure return
1685     {
1686         assert(!full, "SumbissionQueue is full");
1687         return sqes[localTail++ & ringMask];
1688     }
1689 
1690     void put()(auto ref SubmissionEntry entry) @safe pure
1691     {
1692         assert(!full, "SumbissionQueue is full");
1693         sqes[localTail++ & ringMask] = entry;
1694     }
1695 
1696     void put(OP)(auto ref OP op)
1697         if (!is(OP == SubmissionEntry))
1698     {
1699         assert(!full, "SumbissionQueue is full");
1700         sqes[localTail++ & ringMask].fill(op);
1701     }
1702 
1703     private void putWith(alias FN, ARGS...)(auto ref ARGS args)
1704     {
1705         import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple;
1706 
1707         static assert(
1708             Parameters!FN.length >= 1
1709             && is(Parameters!FN[0] == SubmissionEntry)
1710             && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_,
1711             "Alias function must accept at least `ref SubmissionEntry`");
1712 
1713         static assert(
1714             is(typeof(FN(sqes[localTail & ringMask], args))),
1715             "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof);
1716 
1717         assert(!full, "SumbissionQueue is full");
1718         FN(sqes[localTail++ & ringMask], args);
1719     }
1720 
1721     uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); }
1722 }
1723 
1724 struct CompletionQueue
1725 {
1726     nothrow @nogc:
1727 
1728     // mmaped fields
1729     uint* khead; // controlled by us (increment after entry at head was read)
1730     uint* ktail; // updated by kernel
1731     uint* koverflow;
1732     uint* kflags;
1733     CompletionEntry[] cqes; // array of entries (fixed length)
1734 
1735     uint ringMask; // constant mask used to determine array index from head/tail
1736 
1737     // mmap details (for cleanup)
1738     void* ring;
1739     size_t ringSize;
1740 
1741     uint localHead; // used for bulk reading
1742 
1743     uint head() const @safe pure { return localHead; }
1744     uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); }
1745 
1746     void flushHead() @safe pure
1747     {
1748         pragma(inline, true);
1749         // debug printf("CQ updating head: %d\n", localHead);
1750         atomicStore!(MemoryOrder.rel)(*khead, localHead);
1751     }
1752 
1753     bool empty() const @safe pure { return head == tail; }
1754 
1755     ref CompletionEntry front() @safe pure return
1756     {
1757         assert(!empty, "CompletionQueue is empty");
1758         return cqes[localHead & ringMask];
1759     }
1760 
1761     void popFront() @safe pure
1762     {
1763         pragma(inline);
1764         assert(!empty, "CompletionQueue is empty");
1765         localHead++;
1766         flushHead();
1767     }
1768 
1769     size_t length() const @safe pure { return tail - localHead; }
1770 
1771     uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); }
1772 
1773     /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel.
1774     void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); }
1775 }
1776 
1777 // just a helper to use atomicStore more easily with older compilers
1778 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted
1779 {
1780     pragma(inline, true);
1781     import core.atomic : store = atomicStore;
1782     static if (__VERSION__ >= 2089) store!ms(val, newVal);
1783     else store!ms(*(cast(shared T*)&val), newVal);
1784 }
1785 
1786 // just a helper to use atomicLoad more easily with older compilers
1787 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted
1788 {
1789     pragma(inline, true);
1790     import core.atomic : load = atomicLoad;
1791     static if (__VERSION__ >= 2089) return load!ms(val);
1792     else return load!ms(*(cast(const shared T*)&val));
1793 }
1794 
1795 version (assert)
1796 {
1797     import std.range.primitives : ElementType, isInputRange, isOutputRange;
1798     static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry));
1799     static assert(isOutputRange!(Uring, SubmissionEntry));
1800 }
1801 
1802 version (LDC)
1803 {
1804     import ldc.intrinsics : llvm_expect;
1805     alias _expect = llvm_expect;
1806 }
1807 else
1808 {
1809     T _expect(T)(T val, T expected_val) if (__traits(isIntegral, T))
1810     {
1811         pragma(inline, true);
1812         return val;
1813     }
1814 }