1 /**
2  * Simple idiomatic dlang wrapper around linux io_uring
3  * (see: https://kernel.dk/io_uring.pdf) asynchronous API.
4  */
5 module during;
6 
7 version (linux) {}
8 else static assert(0, "io_uring is available on linux only");
9 
10 public import during.io_uring;
11 
12 import core.atomic : MemoryOrder;
13 debug import core.stdc.stdio;
14 import core.stdc.stdlib;
15 import core.sys.linux.errno;
16 import core.sys.linux.sys.mman;
17 import core.sys.linux.unistd;
18 import core.sys.posix.signal;
19 import core.sys.posix.sys.socket;
20 import core.sys.posix.sys.uio;
21 import std.algorithm.comparison : among;
22 
23 @trusted: //TODO: remove this and use selectively where it makes sense
24 nothrow @nogc:
25 
26 /**
27  * Setup new instance of io_uring into provided `Uring` structure.
28  *
29  * Params:
30  *     uring = `Uring` structure to be initialized (must not be already initialized)
31  *     entries = Number of entries to initialize uring with
32  *     flags = `SetupFlags` to use to initialize uring.
33  *
34  * Returns: On succes it returns 0, `-errno` otherwise.
35  */
36 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE)
37 {
38     assert(uring.payload is null, "Uring is already initialized");
39     uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof);
40     if (uring.payload is null) return -errno;
41 
42     uring.payload.params.flags = flags;
43     uring.payload.refs = 1;
44     auto r = io_uring_setup(entries, uring.payload.params);
45     if (r < 0) return -errno;
46 
47     uring.payload.fd = r;
48 
49     if (uring.payload.mapRings() < 0)
50     {
51         dispose(uring);
52         return -errno;
53     }
54 
55     // debug printf("uring(%d): setup\n", uring.payload.fd);
56 
57     return 0;
58 }
59 
60 /**
61  * Main entry point to work with io_uring.
62  *
63  * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface.
64  * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries.
65  *
66  * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`.
67  *
68  * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for
69  * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry`
70  * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's
71  * reused for multiple operations.
72  */
73 struct Uring
74 {
75     nothrow @nogc:
76 
77     private UringDesc* payload;
78     private void checkInitialized() const
79     {
80         assert(payload !is null, "Uring hasn't been initialized yet");
81     }
82 
83     /// Copy constructor
84     this(ref return scope Uring rhs)
85     {
86         assert(rhs.payload !is null, "rhs payload is null");
87         // debug printf("uring(%d): copy\n", rhs.payload.fd);
88         this.payload = rhs.payload;
89         this.payload.refs++;
90     }
91 
92     /// Destructor
93     ~this()
94     {
95         dispose(this);
96     }
97 
98     /// Native io_uring file descriptor
99     auto fd() const
100     {
101         checkInitialized();
102         return payload.fd;
103     }
104 
105     /// io_uring parameters
106     SetupParameters params() const return
107     {
108         checkInitialized();
109         return payload.params;
110     }
111 
112     /// Check if there is some `CompletionEntry` to process.
113     bool empty() const
114     {
115         checkInitialized();
116         return payload.cq.empty;
117     }
118 
119     /// Check if there is space for another `SubmissionEntry` to submit.
120     bool full() const
121     {
122         checkInitialized();
123         return payload.sq.full;
124     }
125 
126     /// Available space in submission queue before it becomes full
127     size_t capacity() const
128     {
129         checkInitialized();
130         return payload.sq.capacity;
131     }
132 
133     /// Number of entries in completion queue
134     size_t length() const
135     {
136         checkInitialized();
137         return payload.cq.length;
138     }
139 
140     /// Get first `CompletionEntry` from cq ring
141     ref CompletionEntry front() return
142     {
143         checkInitialized();
144         return payload.cq.front;
145     }
146 
147     /// Move to next `CompletionEntry`
148     void popFront()
149     {
150         checkInitialized();
151         return payload.cq.popFront;
152     }
153 
154     /**
155      * Adds new entry to the `SubmissionQueue`.
156      *
157      * Note that this just adds entry to the queue and doesn't advance the tail
158      * marker kernel sees. For that `finishSq()` is needed to be called next.
159      *
160      * Also note that to actually enter new entries to kernel,
161      * it's needed to call `submit()`.
162      *
163      * Params:
164      *     FN    = Function to fill next entry in queue by `ref` (should be faster).
165      *             It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`.
166      *             Note that in this case queue entry is cleaned first before function is called.
167      *     entry = Custom built `SubmissionEntry` to be posted as is.
168      *             Note that in this case it is copied whole over one in the `SubmissionQueue`.
169      *     args  = Optional arguments passed to the function
170      *
171      * Returns: reference to `Uring` structure so it's possible to chain multiple commands.
172      */
173     ref Uring put()(auto ref SubmissionEntry entry) return
174     {
175         checkInitialized();
176         payload.sq.put(entry);
177         return this;
178     }
179 
180     /// ditto
181     ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return
182     {
183         import std.functional : forward;
184         checkInitialized();
185         payload.sq.putWith!FN(forward!args);
186         return this;
187     }
188 
189     /**
190      * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled
191      * to next `SubmissionEntry` in queue.
192      *
193      * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied.
194      *
195      * Params:
196      *   op = Custom operation definition.
197      * Returns:
198      */
199     ref Uring put(OP)(auto ref OP op) return
200         if (!is(OP == SubmissionEntry))
201     {
202         checkInitialized();
203         payload.sq.put(op);
204         return this;
205     }
206 
207     /**
208      * If completion queue is full, the new event maybe dropped.
209      * This value records number of dropped events.
210      */
211     uint overflow() const
212     {
213         checkInitialized();
214         return payload.cq.overflow;
215     }
216 
217     /// Counter of invalid submissions (out-of-bound index in submission array)
218     uint dropped() const
219     {
220         checkInitialized();
221         return payload.sq.dropped;
222     }
223 
224     /**
225      * Submits qued `SubmissionEntry` to be processed by kernel.
226      *
227      * Params:
228      *     want  = number of `CompletionEntries` to wait for.
229      *             If 0, this just submits queued entries and returns.
230      *             If > 0, it blocks until at least wanted number of entries were completed.
231      *     sig   = See io_uring_enter(2) man page
232      *
233      * Returns: Number of submitted entries on success, `-errno` on error
234      */
235     auto submit(uint want = 0, const sigset_t* sig = null) @trusted
236     {
237         checkInitialized();
238 
239         auto len = cast(uint)payload.sq.length;
240         if (len > 0) // anything to submit?
241         {
242             EnterFlags flags;
243             if (want > 0) flags |= EnterFlags.GETEVENTS;
244 
245             payload.sq.flushTail(); // advance queue index
246 
247             if (payload.params.flags & SetupFlags.SQPOLL)
248             {
249                 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP)
250                     flags |= EnterFlags.SQ_WAKEUP;
251                 else if (want == 0) return len; // fast poll
252             }
253             auto r = io_uring_enter(payload.fd, len, want, flags, sig);
254             if (r < 0) return -errno;
255             return r;
256         }
257         else if (want > 0) return wait(want); // just simple wait
258         return 0;
259     }
260 
261     /**
262      * Simmilar to `submit` but with this method we just wait for required number
263      * of `CompletionEntries`.
264      *
265      * Returns: `0` on success, `-errno` on error
266      */
267     auto wait(uint want = 1, const sigset_t* sig = null) @trusted
268     {
269         pragma(inline);
270         checkInitialized();
271         assert(want > 0, "Invalid want value");
272 
273         if (payload.cq.length >= want) return 0; // we don't need to syscall
274 
275         auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig);
276         if (r < 0) return -errno;
277         return 0;
278     }
279 
280     /**
281      * Register single buffer to be mapped into the kernel for faster buffered operations.
282      *
283      * To use the buffers, the application must specify the fixed variants for of operations,
284      * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set
285      * in entry extra data.
286      *
287      * An application can increase or decrease the size or number of registered buffers by first
288      * unregistering the existing buffers, and then issuing a new call to io_uring_register() with
289      * the new buffers.
290      *
291      * Params:
292      *   buffer = Buffers to be registered
293      *
294      * Returns: On success, returns 0.  On error, `-errno` is returned.
295      */
296     auto registerBuffers(T)(T buffers) @trusted
297         if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else?
298     {
299         checkInitialized();
300         assert(buffers.length, "Empty buffer");
301 
302         if (payload.regBuffers !is null)
303             return -EBUSY; // buffers were already registered
304 
305         static if (is(T == ubyte[]))
306         {
307             auto p = malloc(iovec.sizeof);
308             if (p is null) return -errno;
309             payload.regBuffers = (cast(iovec*)p)[0..1];
310             payload.regBuffers[0].iov_base = cast(void*)&buffers[0];
311             payload.regBuffers[0].iov_len = buffers.length;
312         }
313         else static if (is(T == ubyte[][]))
314         {
315             auto p = malloc(buffers.length * iovec.sizeof);
316             if (p is null) return -errno;
317             payload.regBuffers = (cast(iovec*)p)[0..buffers.length];
318 
319             foreach (i, b; buffers)
320             {
321                 assert(b.length, "Empty buffer");
322                 payload.regBuffers[i].iov_base = cast(void*)&b[0];
323                 payload.regBuffers[i].iov_len = b.length;
324             }
325         }
326 
327         auto r = io_uring_register(
328                 payload.fd,
329                 RegisterOpCode.REGISTER_BUFFERS,
330                 cast(const(void)*)&payload.regBuffers[0], 1
331             );
332 
333         if (r < 0) return -errno;
334         return 0;
335     }
336 
337     /**
338      * Releases all previously registered buffers associated with the `io_uring` instance.
339      *
340      * An application need not unregister buffers explicitly before shutting down the io_uring instance.
341      *
342      * Returns: On success, returns 0. On error, `-errno` is returned.
343      */
344     auto unregisterBuffers() @trusted
345     {
346         checkInitialized();
347 
348         if (payload.regBuffers is null)
349             return -ENXIO; // no buffers were registered
350 
351         free(cast(void*)&payload.regBuffers[0]);
352         payload.regBuffers = null;
353 
354         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0);
355         if (r < 0) return -errno;
356         return 0;
357     }
358 
359     /**
360      * Register files for I/O.
361      *
362      * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags
363      * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the
364      * file descriptor array.
365      *
366      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
367      * need only unregister if it wishes to register a new set of fds.
368      *
369      * Use `-1` as a file descriptor to mark it as reserved in the array.*
370      * Params: fds = array of file descriptors to be registered
371      *
372      * Returns: On success, returns 0. On error, `-errno` is returned.
373      */
374     auto registerFiles(const(int)[] fds) @trusted
375     {
376         checkInitialized();
377         assert(fds.length, "No file descriptors provided");
378         assert(fds.length < uint.max, "Too many file descriptors");
379 
380         // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers).
381         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length);
382         if (r < 0) return -errno;
383         return 0;
384     }
385 
386     /*
387      * Register an update for an existing file set. The updates will start at
388      * `off` in the original array.
389      *
390      * Use `-1` as a file descriptor to mark it as reserved in the array.
391      *
392      * Params:
393      *      off = offset to the original registered files to be updated
394      *      files = array of file descriptors to update with
395      *
396      * Returns: number of files updated on success, -errno on failure.
397      */
398     auto registerFilesUpdate(uint off, const(int)[] fds) @trusted
399     {
400         struct Update
401         {
402             uint off;
403             const(int)* fds;
404         }
405 
406         checkInitialized();
407         assert(fds.length, "No file descriptors provided to update");
408         assert(fds.length < uint.max, "Too many file descriptors");
409 
410         Update u = Update(off, &fds[0]);
411         auto r = io_uring_register(
412             payload.fd,
413             RegisterOpCode.REGISTER_FILES_UPDATE,
414             &u, cast(uint)fds.length);
415         if (r < 0) return -errno;
416         return 0;
417     }
418 
419     /**
420      * All previously registered files associated with the `io_uring` instance will be unregistered.
421      *
422      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
423      * need only unregister if it wishes to register a new set of fds.
424      *
425      * Returns: On success, returns 0. On error, `-errno` is returned.
426      */
427     auto unregisterFiles() @trusted
428     {
429         checkInitialized();
430         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0);
431         if (r < 0) return -errno;
432         return 0;
433     }
434 
435     /**
436      * TODO
437      *
438      * Params: eventFD = TODO
439      *
440      * Returns: On success, returns 0. On error, `-errno` is returned.
441      */
442     auto registerEventFD(int eventFD) @trusted
443     {
444         checkInitialized();
445         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1);
446         if (r < 0) return -errno;
447         return 0;
448     }
449 
450     /**
451      * TODO
452      *
453      * Returns: On success, returns 0. On error, `-errno` is returned.
454      */
455     auto unregisterEventFD() @trusted
456     {
457         checkInitialized();
458         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0);
459         if (r < 0) return -errno;
460         return 0;
461     }
462 }
463 
464 /**
465  * Uses custom operation definition to fill fields of `SubmissionEntry`.
466  * Can be used in cases, when builtin prep* functions aren't enough.
467  *
468  * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work.
469  *
470  * Note: This doesn't touch previous state of the entry, just fills the corresponding fields.
471  *       So it might be needed to call `clear` first on the entry (depends on usage).
472  *
473  * Params:
474  *   entry = entry to set parameters to
475  *   op = operation to fill entry with (can be custom type)
476  */
477 void fill(E)(ref SubmissionEntry entry, auto ref E op)
478 {
479     pragma(inline);
480     import std.traits : hasMember, FieldNameTuple;
481 
482     // fill entry from provided operation fields (they must have same name as in SubmissionEntry)
483     foreach (m; FieldNameTuple!E)
484     {
485         static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m);
486         __traits(getMember, entry, m) = __traits(getMember, op, m);
487     }
488 }
489 
490 /**
491  * Template function to help set `SubmissionEntry` `user_data` field.
492  *
493  * Params:
494  *      entry = `SubmissionEntry` to prepare
495  *      data = data to set to the `SubmissionEntry`
496  *
497  * Note: data are passed by ref and must live during whole operation.
498  */
499 void setUserData(D)(ref SubmissionEntry entry, ref D data) @trusted
500 {
501     entry.user_data = cast(ulong)(cast(void*)&data);
502 }
503 
504 /**
505  * Prepares `nop` operation.
506  *
507  * Params:
508  *      entry = `SubmissionEntry` to prepare
509  */
510 void prepNop(ref SubmissionEntry entry) @safe
511 {
512     entry.opcode = Operation.NOP;
513     entry.fd = -1;
514 }
515 
516 // TODO: check offset behavior, preadv2/pwritev2 should accept -1 to work on the current file offset,
517 // but it doesn't seem to work here.
518 
519 /**
520  * Prepares `readv` operation.
521  *
522  * Params:
523  *      entry = `SubmissionEntry` to prepare
524  *      fd = file descriptor of file we are operating on
525  *      offset = offset
526  *      buffer = iovec buffers to be used by the operation
527  */
528 void prepReadv(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset) @trusted
529     if (is(V == iovec[]) || is(V == iovec))
530 {
531     entry.opcode = Operation.READV;
532     entry.fd = fd;
533     entry.off = offset;
534     static if (is(V == iovec[]))
535     {
536         assert(buffer.length, "Empty buffer");
537         assert(buffer.length < uint.max, "Too many iovec buffers");
538         entry.addr = cast(ulong)(cast(void*)&buffer[0]);
539         entry.len = cast(uint)buffer.length;
540     }
541     else
542     {
543         entry.addr = cast(ulong)(cast(void*)&buffer);
544         entry.len = 1;
545     }
546 }
547 
548 /**
549  * Prepares `writev` operation.
550  *
551  * Params:
552  *      entry = `SubmissionEntry` to prepare
553  *      fd = file descriptor of file we are operating on
554  *      offset = offset
555  *      buffer = iovec buffers to be used by the operation
556  */
557 void prepWritev(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset) @trusted
558     if (is(V == iovec[]) || is(V == iovec))
559 {
560     entry.opcode = Operation.WRITEV;
561     entry.fd = fd;
562     entry.off = offset;
563     static if (is(V == iovec[]))
564     {
565         assert(buffer.length, "Empty buffer");
566         assert(buffer.length < uint.max, "Too many iovec buffers");
567         entry.addr = cast(ulong)(cast(void*)&buffer[0]);
568         entry.len = cast(uint)buffer.length;
569     }
570     else
571     {
572         entry.addr = cast(ulong)(cast(void*)&buffer);
573         entry.len = 1;
574     }
575 }
576 
577 /**
578  * Prepares `read_fixed` operation.
579  *
580  * Params:
581  *      entry = `SubmissionEntry` to prepare
582  *      fd = file descriptor of file we are operating on
583  *      offset = offset
584  *      buffer = slice to preregistered buffer
585  *      bufferIndex = index to the preregistered buffers array buffer belongs to
586  */
587 void prepReadFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex) @trusted
588 {
589     assert(buffer.length, "Empty buffer");
590     assert(buffer.length < uint.max, "Buffer too large");
591     entry.opcode = Operation.READ_FIXED;
592     entry.fd = fd;
593     entry.off = offset;
594     entry.addr = cast(ulong)(cast(void*)&buffer[0]);
595     entry.len = cast(uint)buffer.length;
596     entry.buf_index = bufferIndex;
597 }
598 
599 /**
600  * Prepares `write_fixed` operation.
601  *
602  * Params:
603  *      entry = `SubmissionEntry` to prepare
604  *      fd = file descriptor of file we are operating on
605  *      offset = offset
606  *      buffer = slice to preregistered buffer
607  *      bufferIndex = index to the preregistered buffers array buffer belongs to
608  */
609 void prepWriteFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex) @trusted
610 {
611     assert(buffer.length, "Empty buffer");
612     assert(buffer.length < uint.max, "Buffer too large");
613     entry.opcode = Operation.WRITE_FIXED;
614     entry.fd = fd;
615     entry.off = offset;
616     entry.addr = cast(ulong)(cast(void*)&buffer[0]);
617     entry.len = cast(uint)buffer.length;
618     entry.buf_index = bufferIndex;
619 }
620 
621 /**
622  * Prepares `sendmsg(2)` operation.
623  *
624  * Params:
625  *      entry = `SubmissionEntry` to prepare
626  *      fd = file descriptor of file we are operating on
627  *      msg = message to operate with
628  *      flags = `sendmsg` operation flags
629  *
630  * Note: Available from Linux 5.3
631  *
632  * See_Also: `sendmsg(2)` man page for details.
633  */
634 void prepSendMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
635 {
636     entry.opcode = Operation.SENDMSG;
637     entry.fd = fd;
638     entry.addr = cast(ulong)(cast(void*)&msg);
639     entry.msg_flags = flags;
640 }
641 
642 /**
643  * Prepares `recvmsg(2)` operation.
644  *
645  * Params:
646  *      entry = `SubmissionEntry` to prepare
647  *      fd = file descriptor of file we are operating on
648  *      msg = message to operate with
649  *      flags = `recvmsg` operation flags
650  *
651  * Note: Available from Linux 5.3
652  *
653  * See_Also: `recvmsg(2)` man page for details.
654  */
655 void prepRecvMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
656 {
657     entry.opcode = Operation.RECVMSG;
658     entry.fd = fd;
659     entry.addr = cast(ulong)(cast(void*)&msg);
660     entry.msg_flags = flags;
661 }
662 
663 /**
664  * Prepares `fsync` operation.
665  *
666  * Params:
667  *      entry = `SubmissionEntry` to prepare
668  *      fd = file descriptor of a file to call `fsync` on
669  *      flags = `fsync` operation flags
670  */
671 void prepFsync(ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe
672 {
673     entry.opcode = Operation.FSYNC;
674     entry.fd = fd;
675     entry.fsync_flags = flags;
676 }
677 
678 /**
679  * Poll the fd specified in the submission queue entry for the events specified in the poll_events
680  * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode.
681  * That is, once the poll operation is completed, it will have to be resubmitted.
682  *
683  * Params:
684  *      entry = `SubmissionEntry` to prepare
685  *      fd = file descriptor to poll
686  *      events = events to poll on the FD
687  */
688 void prepPollAdd(ref SubmissionEntry entry, int fd, PollEvents events) @safe
689 {
690     entry.opcode = Operation.POLL_ADD;
691     entry.fd = fd;
692     entry.poll_events = events;
693 }
694 
695 /**
696  * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain
697  * `0`.  If not found, res will contain `-ENOENT`.
698  *
699  * Params:
700  *      entry = `SubmissionEntry` to prepare
701  *      userData = data with the previously issued poll operation
702  */
703 void prepPollRemove(D)(ref SubmissionEntry entry, ref D userData) @trusted
704 {
705     entry.opcode = Operation.POLL_REMOVE;
706     entry.fd = -1;
707     entry.addr = cast(ulong)(cast(void*)&userData);
708 }
709 
710 /**
711  * Prepares `sync_file_range(2)` operation.
712  *
713  * Sync a file segment with disk, permits fine control when synchronizing the open file referred to
714  * by the file descriptor fd with disk.
715  *
716  * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized.
717  *
718  * Params:
719  *      entry = `SubmissionEntry` to prepare
720  *      fd = is the file descriptor to sync
721  *      offset = the starting byte of the file range to be synchronized
722  *      len = the length of the range to be synchronized, in bytes
723  *      flags = the flags for the command.
724  *
725  * See_Also: `sync_file_range(2)` for the general description of the related system call.
726  *
727  * Note: available from Linux 5.2
728  */
729 void prepSyncFileRange(ref SubmissionEntry entry, int fd, ulong offset, uint len, SyncFileRangeFlags flags) @safe
730 {
731     entry.opcode = Operation.SYNC_FILE_RANGE;
732     entry.fd = fd;
733     entry.off = offset;
734     entry.len = len;
735     entry.sync_range_flags = flags;
736 }
737 
738 /**
739  * This command will register a timeout operation.
740  *
741  * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A
742  * timeout condition is met when either the specified timeout expires, or the specified number of
743  * events have completed. Either condition will trigger the event. The request will complete with
744  * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got
745  * completed through requests completing on their own. If the timeout was cancelled before it
746  * expired, the request will complete with `-ECANCELED`.
747  *
748  * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation.
749  *
750  * Params:
751  *      entry = `SubmissionEntry` to prepare
752  *      time = reference to `time64` data structure
753  *      count = completion event count
754  *      flags = define if it's a relative or absolute time
755  *
756  * Note: Available from Linux 5.4
757  */
758 void prepTimeout(ref SubmissionEntry entry, ref KernelTimespec time,
759     ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted
760 {
761     entry.opcode = Operation.TIMEOUT;
762     entry.fd = -1;
763     entry.addr = cast(ulong)(cast(void*)&time);
764     entry.len = 1;
765     entry.off = count;
766     entry.timeout_flags = flags;
767 }
768 
769 /**
770  * Prepares operations to remove existing timeout registered using `TIMEOUT`operation.
771  *
772  * Attempt to remove an existing timeout operation. If the specified timeout request is found and
773  * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the
774  * timeout request was found but expiration was already in progress, this request will terminate
775  * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will
776  * terminate with a result value of `-ENOENT`.
777  *
778  * Params:
779  *      entry = `SubmissionEntry` to prepare
780  *      userData = user data provided with the previously issued timeout operation
781  *
782  * Note: Available from Linux 5.5
783  */
784 void prepTimeoutRemove(D)(ref SubmissionEntry entry, ref D userData) @trusted
785 {
786     entry.opcode = Operation.TIMEOUT_REMOVE;
787     entry.fd = -1;
788     entry.addr = cast(ulong)(cast(void*)&userData);
789 }
790 
791 /**
792  * Prepares `accept4(2)` operation.
793  *
794  * See_Also: `accept4(2)`` for the general description of the related system call.
795  *
796  * Params:
797  *      entry = `SubmissionEntry` to prepare
798  *      fd = socket file descriptor
799  *      addr = reference to one of sockaddr structires to be filled with accepted client address
800  *      addrlen = reference to addrlen field that would be filled with accepted client address length
801  *
802  * Note: Available from Linux 5.5
803  */
804 void prepAccept(ADDR)(ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
805     AcceptFlags flags = AcceptFlags.NONE) @trusted
806 {
807     entry.opcode = Operation.ACCEPT;
808     entry.fd = fd;
809     entry.addr = cast(ulong)(cast(void*)&addr);
810     entry.addr2 = cast(ulong)(cast(void*)&addrlen);
811     entry.accept_flags = flags;
812 }
813 
814 /**
815  * Prepares operation that cancels existing async work.
816  *
817  * This works with any read/write request, accept,send/recvmsg, etc. There’s an important
818  * distinction to make here with the different kinds of commands. A read/write on a regular file
819  * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore
820  * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel
821  * these operations if they haven’t yet been started. If they have been started, cancellations on
822  * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled
823  * at any time. The completion event for this request will have a result of 0 if done successfully,
824  * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request
825  * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may
826  * not cause this request to be stopped sooner. For blocking IO, the original request will complete
827  * as it originally would have. For IO that is cancellable, it will terminate sooner if at all
828  * possible.
829  *
830  * Params:
831  *      entry = `SubmissionEntry` to prepare
832  *      userData = `user_data` field of the request that should be cancelled
833  *
834  * Note: Available from Linux 5.5
835  */
836 void prepCancel(D)(ref SubmissionEntry entry, ref D userData/*, uint flags = 0*/) @trusted
837 {
838     entry.opcode = Operation.ASYNC_CANCEL;
839     entry.fd = -1;
840     entry.addr = cast(ulong)(cast(void*)&userData);
841     // entry.cancel_flags = flags; // TODO: there are none yet
842 }
843 
844 /**
845  * Prepares linked timeout operation.
846  *
847  * This request must be linked with another request through `IOSQE_IO_LINK` which is described below.
848  * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion
849  * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no
850  * completion event count as it's tied to a specific request. If used, the timeout specified in the
851  * command will cancel the linked command, unless the linked command completes before the
852  * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was
853  * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked
854  * request.
855  *
856  * Note: Available from Linux 5.5
857  *
858  * Params:
859  *      entry = `SubmissionEntry` to prepare
860  *      time = time specification
861  *      flags = define if it's a relative or absolute time
862  */
863 void prepLinkTimeout(ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted
864 {
865     entry.opcode = Operation.LINK_TIMEOUT;
866     entry.fd = -1;
867     entry.addr = cast(ulong)(cast(void*)&time);
868     entry.len = 1;
869     entry.timeout_flags = flags;
870 }
871 
872 /**
873  * Note: Available from Linux 5.5
874  */
875 void prepConnect(ADDR)(ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted
876 {
877     entry.opcode = Operation.CONNECT;
878     entry.fd = fd;
879     entry.addr = cast(ulong)(cast(void*)&addr);
880     entry.off = ADDR.sizeof;
881 }
882 
883 private:
884 
885 // uring cleanup
886 void dispose(ref Uring uring)
887 {
888     if (uring.payload is null) return;
889     // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs);
890     if (--uring.payload.refs == 0)
891     {
892         import std.traits : hasElaborateDestructor;
893         // debug printf("uring(%d): free\n", uring.payload.fd);
894         static if (hasElaborateDestructor!UringDesc)
895             destroy(*uring.payload); // call possible destructors
896         free(cast(void*)uring.payload);
897     }
898     uring.payload = null;
899 }
900 
901 // system fields descriptor
902 struct UringDesc
903 {
904     nothrow @nogc:
905 
906     int fd;
907     size_t refs;
908     SetupParameters params;
909     SubmissionQueue sq;
910     CompletionQueue cq;
911 
912     iovec[] regBuffers;
913 
914     ~this()
915     {
916         if (regBuffers) free(cast(void*)&regBuffers[0]);
917         if (sq.ring) munmap(sq.ring, sq.ringSize);
918         if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof);
919         if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize);
920         close(fd);
921     }
922 
923     private auto mapRings()
924     {
925         sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof;
926         cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof;
927 
928         if (params.features & SetupFeatures.SINGLE_MMAP)
929         {
930             if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize;
931             cq.ringSize = sq.ringSize;
932         }
933 
934         sq.ring = mmap(null, sq.ringSize,
935             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
936             fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET
937         );
938 
939         if (sq.ring == MAP_FAILED)
940         {
941             sq.ring = null;
942             return -errno;
943         }
944 
945         if (params.features & SetupFeatures.SINGLE_MMAP)
946             cq.ring = sq.ring;
947         else
948         {
949             cq.ring = mmap(null, cq.ringSize,
950                 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
951                 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET
952             );
953 
954             if (cq.ring == MAP_FAILED)
955             {
956                 cq.ring = null;
957                 return -errno; // cleanup is done in struct destructors
958             }
959         }
960 
961         uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries);
962         sq.khead        = cast(uint*)(sq.ring + params.sq_off.head);
963         sq.ktail        = cast(uint*)(sq.ring + params.sq_off.tail);
964         sq.localTail    = *sq.ktail;
965         sq.ringMask     = *cast(uint*)(sq.ring + params.sq_off.ring_mask);
966         sq.kflags       = cast(uint*)(sq.ring + params.sq_off.flags);
967         sq.kdropped     = cast(uint*)(sq.ring + params.sq_off.dropped);
968 
969         // Indirection array of indexes to the sqes array (head and tail are pointing to this array).
970         // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it.
971         // That way, head and tail are actually indexes to our sqes array.
972         foreach (i; 0..entries)
973         {
974             *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i;
975         }
976 
977         auto psqes = mmap(
978             null, entries * SubmissionEntry.sizeof,
979             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
980             fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET
981         );
982 
983         if (psqes == MAP_FAILED) return -errno;
984         sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries];
985 
986         entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries);
987         cq.khead        = cast(uint*)(cq.ring + params.cq_off.head);
988         cq.localHead    = *cq.khead;
989         cq.ktail        = cast(uint*)(cq.ring + params.cq_off.tail);
990         cq.ringMask     = *cast(uint*)(cq.ring + params.cq_off.ring_mask);
991         cq.koverflow    = cast(uint*)(cq.ring + params.cq_off.overflow);
992         cq.cqes         = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries];
993         return 0;
994     }
995 }
996 
997 /// Wraper for `SubmissionEntry` queue
998 struct SubmissionQueue
999 {
1000     nothrow @nogc:
1001 
1002     // mmaped fields
1003     uint* khead; // controlled by kernel
1004     uint* ktail; // controlled by us
1005     uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP)
1006     uint* kdropped; // counter of invalid submissions (out of bound index)
1007     uint ringMask; // constant mask used to determine array index from head/tail
1008 
1009     // mmap details (for cleanup)
1010     void* ring; // pointer to the mmaped region
1011     size_t ringSize; // size of mmaped memory block
1012 
1013     // mmapped list of entries (fixed length)
1014     SubmissionEntry[] sqes;
1015 
1016     uint localTail; // used for batch submission
1017 
1018     uint head() const { return atomicLoad!(MemoryOrder.acq)(*khead); }
1019     uint tail() const { return localTail; }
1020 
1021     void flushTail()
1022     {
1023         pragma(inline);
1024         // debug printf("SQ updating tail: %d\n", localTail);
1025         atomicStore!(MemoryOrder.rel)(*ktail, localTail);
1026     }
1027 
1028     SubmissionQueueFlags flags() const
1029     {
1030         return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags);
1031     }
1032 
1033     bool full() const { return sqes.length == length; }
1034 
1035     size_t length() const { return tail - head; }
1036 
1037     size_t capacity() const { return sqes.length - length; }
1038 
1039     void put()(auto ref SubmissionEntry entry)
1040     {
1041         assert(!full, "SumbissionQueue is full");
1042         sqes[tail & ringMask] = entry;
1043         localTail++;
1044     }
1045 
1046     void put(OP)(auto ref OP op)
1047         if (!is(OP == SubmissionEntry))
1048     {
1049         assert(!full, "SumbissionQueue is full");
1050         sqes[tail & ringMask].clear();
1051         sqes[tail & ringMask].fill(op);
1052         localTail++;
1053     }
1054 
1055     private void putWith(alias FN, ARGS...)(auto ref ARGS args)
1056     {
1057         import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple;
1058 
1059         static assert(
1060             Parameters!FN.length >= 1
1061             && is(Parameters!FN[0] == SubmissionEntry)
1062             && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_,
1063             "Alias function must accept at least `ref SubmissionEntry`");
1064 
1065         static assert(
1066             is(typeof(FN(sqes[tail & ringMask], args))),
1067             "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof);
1068 
1069         assert(!full, "SumbissionQueue is full");
1070         sqes[tail & ringMask].clear();
1071         FN(sqes[tail & ringMask], args);
1072         localTail++;
1073     }
1074 
1075     uint dropped() const { return atomicLoad!(MemoryOrder.raw)(*kdropped); }
1076 }
1077 
1078 struct CompletionQueue
1079 {
1080     nothrow @nogc:
1081 
1082     // mmaped fields
1083     uint* khead; // controlled by us (increment after entry at head was read)
1084     uint* ktail; // updated by kernel
1085     uint* koverflow;
1086     CompletionEntry[] cqes; // array of entries (fixed length)
1087 
1088     uint ringMask; // constant mask used to determine array index from head/tail
1089 
1090     // mmap details (for cleanup)
1091     void* ring;
1092     size_t ringSize;
1093 
1094     uint localHead; // used for bulk reading
1095 
1096     uint head() const { return localHead; }
1097     uint tail() const { return atomicLoad!(MemoryOrder.acq)(*ktail); }
1098 
1099     void flushHead()
1100     {
1101         pragma(inline);
1102         // debug printf("CQ updating head: %d\n", localHead);
1103         atomicStore!(MemoryOrder.rel)(*khead, localHead);
1104     }
1105 
1106     bool empty() const { return head == tail; }
1107 
1108     ref CompletionEntry front() return
1109     {
1110         assert(!empty, "CompletionQueue is empty");
1111         return cqes[localHead & ringMask];
1112     }
1113 
1114     void popFront()
1115     {
1116         pragma(inline);
1117         assert(!empty, "CompletionQueue is empty");
1118         localHead++;
1119         flushHead();
1120     }
1121 
1122     size_t length() const { return tail - localHead; }
1123 
1124     uint overflow() const { return atomicLoad!(MemoryOrder.raw)(*koverflow); }
1125 }
1126 
1127 // just a helper to use atomicStore more easily with older compilers
1128 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted
1129 {
1130     pragma(inline, true);
1131     import core.atomic : store = atomicStore;
1132     static if (__VERSION__ >= 2089) store!ms(val, newVal);
1133     else store!ms(*(cast(shared T*)&val), newVal);
1134 }
1135 
1136 // just a helper to use atomicLoad more easily with older compilers
1137 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted
1138 {
1139     pragma(inline, true);
1140     import core.atomic : load = atomicLoad;
1141     static if (__VERSION__ >= 2089) return load!ms(val);
1142     else return load!ms(*(cast(const shared T*)&val));
1143 }
1144 
1145 version (assert)
1146 {
1147     import std.range.primitives : ElementType, isInputRange, isOutputRange;
1148     static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry));
1149     static assert(isOutputRange!(Uring, SubmissionEntry));
1150 }