1 /**
2  * Simple idiomatic dlang wrapper around linux io_uring
3  * (see: https://kernel.dk/io_uring.pdf) asynchronous API.
4  */
5 module during;
6 
7 version (linux) {}
8 else static assert(0, "io_uring is available on linux only");
9 
10 public import during.io_uring;
11 
12 import core.atomic : MemoryOrder;
13 debug import core.stdc.stdio;
14 import core.stdc.stdlib;
15 import core.sys.linux.errno;
16 import core.sys.linux.sys.mman;
17 import core.sys.linux.unistd;
18 import core.sys.posix.signal;
19 import core.sys.posix.sys.socket;
20 import core.sys.posix.sys.uio;
21 import std.algorithm.comparison : among;
22 
23 nothrow @nogc:
24 
25 /**
26  * Setup new instance of io_uring into provided `Uring` structure.
27  *
28  * Params:
29  *     uring = `Uring` structure to be initialized (must not be already initialized)
30  *     entries = Number of entries to initialize uring with
31  *     flags = `SetupFlags` to use to initialize uring.
32  *
33  * Returns: On succes it returns 0, `-errno` otherwise.
34  */
35 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE)
36 {
37     assert(uring.payload is null, "Uring is already initialized");
38     uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof);
39     if (uring.payload is null) return -errno;
40 
41     uring.payload.params.flags = flags;
42     uring.payload.refs = 1;
43     auto r = io_uring_setup(entries, uring.payload.params);
44     if (r < 0) return -errno;
45 
46     uring.payload.fd = r;
47 
48     if (uring.payload.mapRings() < 0)
49     {
50         dispose(uring);
51         return -errno;
52     }
53 
54     // debug printf("uring(%d): setup\n", uring.payload.fd);
55 
56     return 0;
57 }
58 
59 /**
60  * Main entry point to work with io_uring.
61  *
62  * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface.
63  * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries.
64  *
65  * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`.
66  *
67  * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for
68  * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry`
69  * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's
70  * reused for multiple operations.
71  */
72 struct Uring
73 {
74     nothrow @nogc:
75 
76     private UringDesc* payload;
77     private void checkInitialized() const
78     {
79         assert(payload !is null, "Uring hasn't been initialized yet");
80     }
81 
82     /// Copy constructor
83     this(ref return scope Uring rhs)
84     {
85         assert(rhs.payload !is null, "rhs payload is null");
86         // debug printf("uring(%d): copy\n", rhs.payload.fd);
87         this.payload = rhs.payload;
88         this.payload.refs++;
89     }
90 
91     /// Destructor
92     ~this()
93     {
94         dispose(this);
95     }
96 
97     /// Native io_uring file descriptor
98     auto fd() const
99     {
100         checkInitialized();
101         return payload.fd;
102     }
103 
104     /// io_uring parameters
105     SetupParameters params() const return
106     {
107         checkInitialized();
108         return payload.params;
109     }
110 
111     /// Check if there is some `CompletionEntry` to process.
112     bool empty() const
113     {
114         checkInitialized();
115         return payload.cq.empty;
116     }
117 
118     /// Check if there is space for another `SubmissionEntry` to submit.
119     bool full() const
120     {
121         checkInitialized();
122         return payload.sq.full;
123     }
124 
125     /// Available space in submission queue before it becomes full
126     size_t capacity() const
127     {
128         checkInitialized();
129         return payload.sq.capacity;
130     }
131 
132     /// Number of entries in completion queue
133     size_t length() const
134     {
135         checkInitialized();
136         return payload.cq.length;
137     }
138 
139     /// Get first `CompletionEntry` from cq ring
140     ref CompletionEntry front() return
141     {
142         checkInitialized();
143         return payload.cq.front;
144     }
145 
146     /// Move to next `CompletionEntry`
147     void popFront()
148     {
149         checkInitialized();
150         return payload.cq.popFront;
151     }
152 
153     /**
154      * Adds new entry to the `SubmissionQueue`.
155      *
156      * Note that this just adds entry to the queue and doesn't advance the tail
157      * marker kernel sees. For that `finishSq()` is needed to be called next.
158      *
159      * Also note that to actually enter new entries to kernel,
160      * it's needed to call `submit()`.
161      *
162      * Params:
163      *     FN    = Function to fill next entry in queue by `ref` (should be faster).
164      *             It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`.
165      *             Note that in this case queue entry is cleaned first before function is called.
166      *     entry = Custom built `SubmissionEntry` to be posted as is.
167      *             Note that in this case it is copied whole over one in the `SubmissionQueue`.
168      *     args  = Optional arguments passed to the function
169      *
170      * Returns: reference to `Uring` structure so it's possible to chain multiple commands.
171      */
172     ref Uring put()(auto ref SubmissionEntry entry) return
173     {
174         checkInitialized();
175         payload.sq.put(entry);
176         return this;
177     }
178 
179     /// ditto
180     ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return
181     {
182         import std.functional : forward;
183         checkInitialized();
184         payload.sq.putWith!FN(forward!args);
185         return this;
186     }
187 
188     /**
189      * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled
190      * to next `SubmissionEntry` in queue.
191      *
192      * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied.
193      *
194      * Params:
195      *   op = Custom operation definition.
196      * Returns:
197      */
198     ref Uring put(OP)(auto ref OP op) return
199         if (!is(OP == SubmissionEntry))
200     {
201         checkInitialized();
202         payload.sq.put(op);
203         return this;
204     }
205 
206     /**
207      * If completion queue is full, the new event maybe dropped.
208      * This value records number of dropped events.
209      */
210     uint overflow() const
211     {
212         checkInitialized();
213         return payload.cq.overflow;
214     }
215 
216     /// Counter of invalid submissions (out-of-bound index in submission array)
217     uint dropped() const
218     {
219         checkInitialized();
220         return payload.sq.dropped;
221     }
222 
223     /**
224      * Submits qued `SubmissionEntry` to be processed by kernel.
225      *
226      * Params:
227      *     want  = number of `CompletionEntries` to wait for.
228      *             If 0, this just submits queued entries and returns.
229      *             If > 0, it blocks until at least wanted number of entries were completed.
230      *     sig   = See io_uring_enter(2) man page
231      *
232      * Returns: Number of submitted entries on success, `-errno` on error
233      */
234     auto submit(uint want = 0, const sigset_t* sig = null) @trusted
235     {
236         checkInitialized();
237 
238         auto len = cast(uint)payload.sq.length;
239         if (len > 0) // anything to submit?
240         {
241             EnterFlags flags;
242             if (want > 0) flags |= EnterFlags.GETEVENTS;
243 
244             payload.sq.flushTail(); // advance queue index
245 
246             if (payload.params.flags & SetupFlags.SQPOLL)
247             {
248                 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP)
249                     flags |= EnterFlags.SQ_WAKEUP;
250                 else if (want == 0) return len; // fast poll
251             }
252             auto r = io_uring_enter(payload.fd, len, want, flags, sig);
253             if (r < 0) return -errno;
254             return r;
255         }
256         else if (want > 0) return wait(want); // just simple wait
257         return 0;
258     }
259 
260     /**
261      * Simmilar to `submit` but with this method we just wait for required number
262      * of `CompletionEntries`.
263      *
264      * Returns: `0` on success, `-errno` on error
265      */
266     auto wait(uint want = 1, const sigset_t* sig = null) @trusted
267     {
268         pragma(inline);
269         checkInitialized();
270         assert(want > 0, "Invalid want value");
271 
272         if (payload.cq.length >= want) return 0; // we don't need to syscall
273 
274         auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig);
275         if (r < 0) return -errno;
276         return 0;
277     }
278 
279     /**
280      * Register single buffer to be mapped into the kernel for faster buffered operations.
281      *
282      * To use the buffers, the application must specify the fixed variants for of operations,
283      * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set
284      * in entry extra data.
285      *
286      * An application can increase or decrease the size or number of registered buffers by first
287      * unregistering the existing buffers, and then issuing a new call to io_uring_register() with
288      * the new buffers.
289      *
290      * Params:
291      *   buffer = Buffers to be registered
292      *
293      * Returns: On success, returns 0.  On error, `-errno` is returned.
294      */
295     auto registerBuffers(T)(T buffers)
296         if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else?
297     {
298         checkInitialized();
299         assert(buffers.length, "Empty buffer");
300 
301         if (payload.regBuffers !is null)
302             return -EBUSY; // buffers were already registered
303 
304         static if (is(T == ubyte[]))
305         {
306             auto p = malloc(iovec.sizeof);
307             if (p is null) return -errno;
308             payload.regBuffers = (cast(iovec*)p)[0..1];
309             payload.regBuffers[0].iov_base = cast(void*)&buffers[0];
310             payload.regBuffers[0].iov_len = buffers.length;
311         }
312         else static if (is(T == ubyte[][]))
313         {
314             auto p = malloc(buffers.length * iovec.sizeof);
315             if (p is null) return -errno;
316             payload.regBuffers = (cast(iovec*)p)[0..buffers.length];
317 
318             foreach (i, b; buffers)
319             {
320                 assert(b.length, "Empty buffer");
321                 payload.regBuffers[i].iov_base = cast(void*)&b[0];
322                 payload.regBuffers[i].iov_len = b.length;
323             }
324         }
325 
326         auto r = io_uring_register(
327                 payload.fd,
328                 RegisterOpCode.REGISTER_BUFFERS,
329                 cast(const(void)*)&payload.regBuffers[0], 1
330             );
331 
332         if (r < 0) return -errno;
333         return 0;
334     }
335 
336     /**
337      * Releases all previously registered buffers associated with the `io_uring` instance.
338      *
339      * An application need not unregister buffers explicitly before shutting down the io_uring instance.
340      *
341      * Returns: On success, returns 0. On error, `-errno` is returned.
342      */
343     auto unregisterBuffers() @trusted
344     {
345         checkInitialized();
346 
347         if (payload.regBuffers is null)
348             return -ENXIO; // no buffers were registered
349 
350         free(cast(void*)&payload.regBuffers[0]);
351         payload.regBuffers = null;
352 
353         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0);
354         if (r < 0) return -errno;
355         return 0;
356     }
357 
358     /**
359      * Register files for I/O.
360      *
361      * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags
362      * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the
363      * file descriptor array.
364      *
365      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
366      * need only unregister if it wishes to register a new set of fds.
367      *
368      * Use `-1` as a file descriptor to mark it as reserved in the array.*
369      * Params: fds = array of file descriptors to be registered
370      *
371      * Returns: On success, returns 0. On error, `-errno` is returned.
372      */
373     auto registerFiles(const(int)[] fds)
374     {
375         checkInitialized();
376         assert(fds.length, "No file descriptors provided");
377         assert(fds.length < uint.max, "Too many file descriptors");
378 
379         // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers).
380         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length);
381         if (r < 0) return -errno;
382         return 0;
383     }
384 
385     /*
386      * Register an update for an existing file set. The updates will start at
387      * `off` in the original array.
388      *
389      * Use `-1` as a file descriptor to mark it as reserved in the array.
390      *
391      * Params:
392      *      off = offset to the original registered files to be updated
393      *      files = array of file descriptors to update with
394      *
395      * Returns: number of files updated on success, -errno on failure.
396      */
397     auto registerFilesUpdate(uint off, const(int)[] fds) @trusted
398     {
399         struct Update
400         {
401             uint off;
402             const(int)* fds;
403         }
404 
405         checkInitialized();
406         assert(fds.length, "No file descriptors provided to update");
407         assert(fds.length < uint.max, "Too many file descriptors");
408 
409         Update u = Update(off, &fds[0]);
410         auto r = io_uring_register(
411             payload.fd,
412             RegisterOpCode.REGISTER_FILES_UPDATE,
413             &u, cast(uint)fds.length);
414         if (r < 0) return -errno;
415         return 0;
416     }
417 
418     /**
419      * All previously registered files associated with the `io_uring` instance will be unregistered.
420      *
421      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
422      * need only unregister if it wishes to register a new set of fds.
423      *
424      * Returns: On success, returns 0. On error, `-errno` is returned.
425      */
426     auto unregisterFiles() @trusted
427     {
428         checkInitialized();
429         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0);
430         if (r < 0) return -errno;
431         return 0;
432     }
433 
434     /**
435      * TODO
436      *
437      * Params: eventFD = TODO
438      *
439      * Returns: On success, returns 0. On error, `-errno` is returned.
440      */
441     auto registerEventFD(int eventFD) @trusted
442     {
443         checkInitialized();
444         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1);
445         if (r < 0) return -errno;
446         return 0;
447     }
448 
449     /**
450      * TODO
451      *
452      * Returns: On success, returns 0. On error, `-errno` is returned.
453      */
454     auto unregisterEventFD() @trusted
455     {
456         checkInitialized();
457         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0);
458         if (r < 0) return -errno;
459         return 0;
460     }
461 }
462 
463 /**
464  * Uses custom operation definition to fill fields of `SubmissionEntry`.
465  * Can be used in cases, when builtin prep* functions aren't enough.
466  *
467  * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work.
468  *
469  * Note: This doesn't touch previous state of the entry, just fills the corresponding fields.
470  *       So it might be needed to call `clear` first on the entry (depends on usage).
471  *
472  * Params:
473  *   entry = entry to set parameters to
474  *   op = operation to fill entry with (can be custom type)
475  */
476 void fill(E)(ref SubmissionEntry entry, auto ref E op)
477 {
478     pragma(inline);
479     import std.traits : hasMember, FieldNameTuple;
480 
481     // fill entry from provided operation fields (they must have same name as in SubmissionEntry)
482     foreach (m; FieldNameTuple!E)
483     {
484         static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m);
485         __traits(getMember, entry, m) = __traits(getMember, op, m);
486     }
487 }
488 
489 /**
490  * Template function to help set `SubmissionEntry` `user_data` field.
491  *
492  * Params:
493  *      entry = `SubmissionEntry` to prepare
494  *      data = data to set to the `SubmissionEntry`
495  *
496  * Note: data are passed by ref and must live during whole operation.
497  */
498 void setUserData(D)(ref SubmissionEntry entry, ref D data)
499 {
500     entry.user_data = cast(ulong)(cast(void*)&data);
501 }
502 
503 /**
504  * Prepares `nop` operation.
505  *
506  * Params:
507  *      entry = `SubmissionEntry` to prepare
508  */
509 void prepNop(ref SubmissionEntry entry) @safe
510 {
511     entry.opcode = Operation.NOP;
512     entry.fd = -1;
513 }
514 
515 // TODO: check offset behavior, preadv2/pwritev2 should accept -1 to work on the current file offset,
516 // but it doesn't seem to work here.
517 
518 /**
519  * Prepares `readv` operation.
520  *
521  * Params:
522  *      entry = `SubmissionEntry` to prepare
523  *      fd = file descriptor of file we are operating on
524  *      offset = offset
525  *      buffer = iovec buffers to be used by the operation
526  */
527 void prepReadv(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset)
528     if (is(V == iovec[]) || is(V == iovec))
529 {
530     entry.opcode = Operation.READV;
531     entry.fd = fd;
532     entry.off = offset;
533     static if (is(V == iovec[]))
534     {
535         assert(buffer.length, "Empty buffer");
536         assert(buffer.length < uint.max, "Too many iovec buffers");
537         entry.addr = cast(ulong)(cast(void*)&buffer[0]);
538         entry.len = cast(uint)buffer.length;
539     }
540     else
541     {
542         entry.addr = cast(ulong)(cast(void*)&buffer);
543         entry.len = 1;
544     }
545 }
546 
547 /**
548  * Prepares `writev` operation.
549  *
550  * Params:
551  *      entry = `SubmissionEntry` to prepare
552  *      fd = file descriptor of file we are operating on
553  *      offset = offset
554  *      buffer = iovec buffers to be used by the operation
555  */
556 void prepWritev(V)(ref SubmissionEntry entry, int fd, ref const V buffer, ulong offset)
557     if (is(V == iovec[]) || is(V == iovec))
558 {
559     entry.opcode = Operation.WRITEV;
560     entry.fd = fd;
561     entry.off = offset;
562     static if (is(V == iovec[]))
563     {
564         assert(buffer.length, "Empty buffer");
565         assert(buffer.length < uint.max, "Too many iovec buffers");
566         entry.addr = cast(ulong)(cast(void*)&buffer[0]);
567         entry.len = cast(uint)buffer.length;
568     }
569     else
570     {
571         entry.addr = cast(ulong)(cast(void*)&buffer);
572         entry.len = 1;
573     }
574 }
575 
576 /**
577  * Prepares `read_fixed` operation.
578  *
579  * Params:
580  *      entry = `SubmissionEntry` to prepare
581  *      fd = file descriptor of file we are operating on
582  *      offset = offset
583  *      buffer = slice to preregistered buffer
584  *      bufferIndex = index to the preregistered buffers array buffer belongs to
585  */
586 void prepReadFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex)
587 {
588     assert(buffer.length, "Empty buffer");
589     assert(buffer.length < uint.max, "Buffer too large");
590     entry.opcode = Operation.READ_FIXED;
591     entry.fd = fd;
592     entry.off = offset;
593     entry.addr = cast(ulong)(cast(void*)&buffer[0]);
594     entry.len = cast(uint)buffer.length;
595     entry.buf_index = bufferIndex;
596 }
597 
598 /**
599  * Prepares `write_fixed` operation.
600  *
601  * Params:
602  *      entry = `SubmissionEntry` to prepare
603  *      fd = file descriptor of file we are operating on
604  *      offset = offset
605  *      buffer = slice to preregistered buffer
606  *      bufferIndex = index to the preregistered buffers array buffer belongs to
607  */
608 void prepWriteFixed(ref SubmissionEntry entry, int fd, ulong offset, ubyte[] buffer, ushort bufferIndex)
609 {
610     assert(buffer.length, "Empty buffer");
611     assert(buffer.length < uint.max, "Buffer too large");
612     entry.opcode = Operation.WRITE_FIXED;
613     entry.fd = fd;
614     entry.off = offset;
615     entry.addr = cast(ulong)(cast(void*)&buffer[0]);
616     entry.len = cast(uint)buffer.length;
617     entry.buf_index = bufferIndex;
618 }
619 
620 /**
621  * Prepares `sendmsg(2)` operation.
622  *
623  * Params:
624  *      entry = `SubmissionEntry` to prepare
625  *      fd = file descriptor of file we are operating on
626  *      msg = message to operate with
627  *      flags = `sendmsg` operation flags
628  *
629  * Note: Available from Linux 5.3
630  *
631  * See_Also: `sendmsg(2)` man page for details.
632  */
633 void prepSendMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE)
634 {
635     entry.opcode = Operation.SENDMSG;
636     entry.fd = fd;
637     entry.addr = cast(ulong)(cast(void*)&msg);
638     entry.msg_flags = flags;
639 }
640 
641 /**
642  * Prepares `recvmsg(2)` operation.
643  *
644  * Params:
645  *      entry = `SubmissionEntry` to prepare
646  *      fd = file descriptor of file we are operating on
647  *      msg = message to operate with
648  *      flags = `recvmsg` operation flags
649  *
650  * Note: Available from Linux 5.3
651  *
652  * See_Also: `recvmsg(2)` man page for details.
653  */
654 void prepRecvMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE)
655 {
656     entry.opcode = Operation.RECVMSG;
657     entry.fd = fd;
658     entry.addr = cast(ulong)(cast(void*)&msg);
659     entry.msg_flags = flags;
660 }
661 
662 /**
663  * Prepares `fsync` operation.
664  *
665  * Params:
666  *      entry = `SubmissionEntry` to prepare
667  *      fd = file descriptor of a file to call `fsync` on
668  *      flags = `fsync` operation flags
669  */
670 void prepFsync(ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe
671 {
672     entry.opcode = Operation.FSYNC;
673     entry.fd = fd;
674     entry.fsync_flags = flags;
675 }
676 
677 /**
678  * Poll the fd specified in the submission queue entry for the events specified in the poll_events
679  * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode.
680  * That is, once the poll operation is completed, it will have to be resubmitted.
681  *
682  * Params:
683  *      entry = `SubmissionEntry` to prepare
684  *      fd = file descriptor to poll
685  *      events = events to poll on the FD
686  */
687 void prepPollAdd(ref SubmissionEntry entry, int fd, PollEvents events) @safe
688 {
689     entry.opcode = Operation.POLL_ADD;
690     entry.fd = fd;
691     entry.poll_events = events;
692 }
693 
694 /**
695  * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain
696  * `0`.  If not found, res will contain `-ENOENT`.
697  *
698  * Params:
699  *      entry = `SubmissionEntry` to prepare
700  *      userData = data with the previously issued poll operation
701  */
702 void prepPollRemove(D)(ref SubmissionEntry entry, ref D userData)
703 {
704     entry.opcode = Operation.POLL_REMOVE;
705     entry.fd = -1;
706     entry.addr = cast(ulong)(cast(void*)&userData);
707 }
708 
709 /**
710  * Prepares `sync_file_range(2)` operation.
711  *
712  * Sync a file segment with disk, permits fine control when synchronizing the open file referred to
713  * by the file descriptor fd with disk.
714  *
715  * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized.
716  *
717  * Params:
718  *      entry = `SubmissionEntry` to prepare
719  *      fd = is the file descriptor to sync
720  *      offset = the starting byte of the file range to be synchronized
721  *      len = the length of the range to be synchronized, in bytes
722  *      flags = the flags for the command.
723  *
724  * See_Also: `sync_file_range(2)` for the general description of the related system call.
725  *
726  * Note: available from Linux 5.2
727  */
728 void prepSyncFileRange(ref SubmissionEntry entry, int fd, ulong offset, uint len, SyncFileRangeFlags flags) @safe
729 {
730     entry.opcode = Operation.SYNC_FILE_RANGE;
731     entry.fd = fd;
732     entry.off = offset;
733     entry.len = len;
734     entry.sync_range_flags = flags;
735 }
736 
737 /**
738  * This command will register a timeout operation.
739  *
740  * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A
741  * timeout condition is met when either the specified timeout expires, or the specified number of
742  * events have completed. Either condition will trigger the event. The request will complete with
743  * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got
744  * completed through requests completing on their own. If the timeout was cancelled before it
745  * expired, the request will complete with `-ECANCELED`.
746  *
747  * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation.
748  *
749  * Params:
750  *      entry = `SubmissionEntry` to prepare
751  *      time = reference to `time64` data structure
752  *      count = completion event count
753  *      flags = define if it's a relative or absolute time
754  *
755  * Note: Available from Linux 5.4
756  */
757 void prepTimeout(ref SubmissionEntry entry, ref KernelTimespec time,
758     ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL)
759 {
760     entry.opcode = Operation.TIMEOUT;
761     entry.fd = -1;
762     entry.addr = cast(ulong)(cast(void*)&time);
763     entry.len = 1;
764     entry.off = count;
765     entry.timeout_flags = flags;
766 }
767 
768 /**
769  * Prepares operations to remove existing timeout registered using `TIMEOUT`operation.
770  *
771  * Attempt to remove an existing timeout operation. If the specified timeout request is found and
772  * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the
773  * timeout request was found but expiration was already in progress, this request will terminate
774  * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will
775  * terminate with a result value of `-ENOENT`.
776  *
777  * Params:
778  *      entry = `SubmissionEntry` to prepare
779  *      userData = user data provided with the previously issued timeout operation
780  *
781  * Note: Available from Linux 5.5
782  */
783 void prepTimeoutRemove(D)(ref SubmissionEntry entry, ref D userData)
784 {
785     entry.opcode = Operation.TIMEOUT_REMOVE;
786     entry.fd = -1;
787     entry.addr = cast(ulong)(cast(void*)&userData);
788 }
789 
790 /**
791  * Prepares `accept4(2)` operation.
792  *
793  * See_Also: `accept4(2)`` for the general description of the related system call.
794  *
795  * Params:
796  *      entry = `SubmissionEntry` to prepare
797  *      fd = socket file descriptor
798  *      addr = reference to one of sockaddr structires to be filled with accepted client address
799  *      addrlen = reference to addrlen field that would be filled with accepted client address length
800  *
801  * Note: Available from Linux 5.5
802  */
803 void prepAccept(ADDR)(ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
804     AcceptFlags flags = AcceptFlags.NONE)
805 {
806     entry.opcode = Operation.ACCEPT;
807     entry.fd = fd;
808     entry.addr = cast(ulong)(cast(void*)&addr);
809     entry.addr2 = cast(ulong)(cast(void*)&addrlen);
810     entry.accept_flags = flags;
811 }
812 
813 /**
814  * Prepares operation that cancels existing async work.
815  *
816  * This works with any read/write request, accept,send/recvmsg, etc. There’s an important
817  * distinction to make here with the different kinds of commands. A read/write on a regular file
818  * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore
819  * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel
820  * these operations if they haven’t yet been started. If they have been started, cancellations on
821  * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled
822  * at any time. The completion event for this request will have a result of 0 if done successfully,
823  * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request
824  * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may
825  * not cause this request to be stopped sooner. For blocking IO, the original request will complete
826  * as it originally would have. For IO that is cancellable, it will terminate sooner if at all
827  * possible.
828  *
829  * Params:
830  *      entry = `SubmissionEntry` to prepare
831  *      userData = `user_data` field of the request that should be cancelled
832  *
833  * Note: Available from Linux 5.5
834  */
835 void prepCancel(D)(ref SubmissionEntry entry, ref D userData/*, uint flags = 0*/)
836 {
837     entry.opcode = Operation.ASYNC_CANCEL;
838     entry.fd = -1;
839     entry.addr = cast(ulong)(cast(void*)&userData);
840     // entry.cancel_flags = flags; // TODO: there are none yet
841 }
842 
843 /**
844  * Prepares linked timeout operation.
845  *
846  * This request must be linked with another request through `IOSQE_IO_LINK` which is described below.
847  * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion
848  * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no
849  * completion event count as it's tied to a specific request. If used, the timeout specified in the
850  * command will cancel the linked command, unless the linked command completes before the
851  * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was
852  * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked
853  * request.
854  *
855  * Note: Available from Linux 5.5
856  *
857  * Params:
858  *      entry = `SubmissionEntry` to prepare
859  *      time = time specification
860  *      flags = define if it's a relative or absolute time
861  */
862 void prepLinkTimeout(ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL)
863 {
864     entry.opcode = Operation.LINK_TIMEOUT;
865     entry.fd = -1;
866     entry.addr = cast(ulong)(cast(void*)&time);
867     entry.len = 1;
868     entry.timeout_flags = flags;
869 }
870 
871 /**
872  * Note: Available from Linux 5.5
873  */
874 void prepConnect(ADDR)(ref SubmissionEntry entry, int fd, ref const(ADDR) addr)
875 {
876     entry.opcode = Operation.CONNECT;
877     entry.fd = fd;
878     entry.addr = cast(ulong)(cast(void*)&addr);
879     entry.off = ADDR.sizeof;
880 }
881 
882 private:
883 
884 // uring cleanup
885 void dispose(ref Uring uring)
886 {
887     if (uring.payload is null) return;
888     // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs);
889     if (--uring.payload.refs == 0)
890     {
891         import std.traits : hasElaborateDestructor;
892         // debug printf("uring(%d): free\n", uring.payload.fd);
893         static if (hasElaborateDestructor!UringDesc)
894             destroy(*uring.payload); // call possible destructors
895         free(cast(void*)uring.payload);
896     }
897     uring.payload = null;
898 }
899 
900 // system fields descriptor
901 struct UringDesc
902 {
903     nothrow @nogc:
904 
905     int fd;
906     size_t refs;
907     SetupParameters params;
908     SubmissionQueue sq;
909     CompletionQueue cq;
910 
911     iovec[] regBuffers;
912 
913     ~this()
914     {
915         if (regBuffers) free(cast(void*)&regBuffers[0]);
916         if (sq.ring) munmap(sq.ring, sq.ringSize);
917         if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof);
918         if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize);
919         close(fd);
920     }
921 
922     private auto mapRings()
923     {
924         sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof;
925         cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof;
926 
927         if (params.features & SetupFeatures.SINGLE_MMAP)
928         {
929             if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize;
930             cq.ringSize = sq.ringSize;
931         }
932 
933         sq.ring = mmap(null, sq.ringSize,
934             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
935             fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET
936         );
937 
938         if (sq.ring == MAP_FAILED)
939         {
940             sq.ring = null;
941             return -errno;
942         }
943 
944         if (params.features & SetupFeatures.SINGLE_MMAP)
945             cq.ring = sq.ring;
946         else
947         {
948             cq.ring = mmap(null, cq.ringSize,
949                 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
950                 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET
951             );
952 
953             if (cq.ring == MAP_FAILED)
954             {
955                 cq.ring = null;
956                 return -errno; // cleanup is done in struct destructors
957             }
958         }
959 
960         uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries);
961         sq.khead        = cast(uint*)(sq.ring + params.sq_off.head);
962         sq.ktail        = cast(uint*)(sq.ring + params.sq_off.tail);
963         sq.localTail    = *sq.ktail;
964         sq.ringMask     = *cast(uint*)(sq.ring + params.sq_off.ring_mask);
965         sq.kflags       = cast(uint*)(sq.ring + params.sq_off.flags);
966         sq.kdropped     = cast(uint*)(sq.ring + params.sq_off.dropped);
967 
968         // Indirection array of indexes to the sqes array (head and tail are pointing to this array).
969         // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it.
970         // That way, head and tail are actually indexes to our sqes array.
971         foreach (i; 0..entries)
972         {
973             *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i;
974         }
975 
976         auto psqes = mmap(
977             null, entries * SubmissionEntry.sizeof,
978             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
979             fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET
980         );
981 
982         if (psqes == MAP_FAILED) return -errno;
983         sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries];
984 
985         entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries);
986         cq.khead        = cast(uint*)(cq.ring + params.cq_off.head);
987         cq.localHead    = *cq.khead;
988         cq.ktail        = cast(uint*)(cq.ring + params.cq_off.tail);
989         cq.ringMask     = *cast(uint*)(cq.ring + params.cq_off.ring_mask);
990         cq.koverflow    = cast(uint*)(cq.ring + params.cq_off.overflow);
991         cq.cqes         = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries];
992         return 0;
993     }
994 }
995 
996 /// Wraper for `SubmissionEntry` queue
997 struct SubmissionQueue
998 {
999     nothrow @nogc:
1000 
1001     // mmaped fields
1002     uint* khead; // controlled by kernel
1003     uint* ktail; // controlled by us
1004     uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP)
1005     uint* kdropped; // counter of invalid submissions (out of bound index)
1006     uint ringMask; // constant mask used to determine array index from head/tail
1007 
1008     // mmap details (for cleanup)
1009     void* ring; // pointer to the mmaped region
1010     size_t ringSize; // size of mmaped memory block
1011 
1012     // mmapped list of entries (fixed length)
1013     SubmissionEntry[] sqes;
1014 
1015     uint localTail; // used for batch submission
1016 
1017     uint head() const { return atomicLoad!(MemoryOrder.acq)(*khead); }
1018     uint tail() const { return localTail; }
1019 
1020     void flushTail()
1021     {
1022         pragma(inline);
1023         // debug printf("SQ updating tail: %d\n", localTail);
1024         atomicStore!(MemoryOrder.rel)(*ktail, localTail);
1025     }
1026 
1027     SubmissionQueueFlags flags() const
1028     {
1029         return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags);
1030     }
1031 
1032     bool full() const { return sqes.length == length; }
1033 
1034     size_t length() const { return tail - head; }
1035 
1036     size_t capacity() const { return sqes.length - length; }
1037 
1038     void put()(auto ref SubmissionEntry entry)
1039     {
1040         assert(!full, "SumbissionQueue is full");
1041         sqes[tail & ringMask] = entry;
1042         localTail++;
1043     }
1044 
1045     void put(OP)(auto ref OP op)
1046         if (!is(OP == SubmissionEntry))
1047     {
1048         assert(!full, "SumbissionQueue is full");
1049         sqes[tail & ringMask].clear();
1050         sqes[tail & ringMask].fill(op);
1051         localTail++;
1052     }
1053 
1054     private void putWith(alias FN, ARGS...)(auto ref ARGS args)
1055     {
1056         import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple;
1057 
1058         static assert(
1059             Parameters!FN.length >= 1
1060             && is(Parameters!FN[0] == SubmissionEntry)
1061             && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_,
1062             "Alias function must accept at least `ref SubmissionEntry`");
1063 
1064         static assert(
1065             is(typeof(FN(sqes[tail & ringMask], args))),
1066             "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof);
1067 
1068         assert(!full, "SumbissionQueue is full");
1069         sqes[tail & ringMask].clear();
1070         FN(sqes[tail & ringMask], args);
1071         localTail++;
1072     }
1073 
1074     uint dropped() const { return atomicLoad!(MemoryOrder.raw)(*kdropped); }
1075 }
1076 
1077 struct CompletionQueue
1078 {
1079     nothrow @nogc:
1080 
1081     // mmaped fields
1082     uint* khead; // controlled by us (increment after entry at head was read)
1083     uint* ktail; // updated by kernel
1084     uint* koverflow;
1085     CompletionEntry[] cqes; // array of entries (fixed length)
1086 
1087     uint ringMask; // constant mask used to determine array index from head/tail
1088 
1089     // mmap details (for cleanup)
1090     void* ring;
1091     size_t ringSize;
1092 
1093     uint localHead; // used for bulk reading
1094 
1095     uint head() const { return localHead; }
1096     uint tail() const { return atomicLoad!(MemoryOrder.acq)(*ktail); }
1097 
1098     void flushHead()
1099     {
1100         pragma(inline);
1101         // debug printf("CQ updating head: %d\n", localHead);
1102         atomicStore!(MemoryOrder.rel)(*khead, localHead);
1103     }
1104 
1105     bool empty() const { return head == tail; }
1106 
1107     ref CompletionEntry front() return
1108     {
1109         assert(!empty, "CompletionQueue is empty");
1110         return cqes[localHead & ringMask];
1111     }
1112 
1113     void popFront()
1114     {
1115         pragma(inline);
1116         assert(!empty, "CompletionQueue is empty");
1117         localHead++;
1118         flushHead();
1119     }
1120 
1121     size_t length() const { return tail - localHead; }
1122 
1123     uint overflow() const { return atomicLoad!(MemoryOrder.raw)(*koverflow); }
1124 }
1125 
1126 // just a helper to use atomicStore more easily with older compilers
1127 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted
1128 {
1129     pragma(inline, true);
1130     import core.atomic : store = atomicStore;
1131     static if (__VERSION__ >= 2089) store!ms(val, newVal);
1132     else store!ms(*(cast(shared T*)&val), newVal);
1133 }
1134 
1135 // just a helper to use atomicLoad more easily with older compilers
1136 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted
1137 {
1138     pragma(inline, true);
1139     import core.atomic : load = atomicLoad;
1140     static if (__VERSION__ >= 2089) return load!ms(val);
1141     else return load!ms(*(cast(const shared T*)&val));
1142 }
1143 
1144 version (assert)
1145 {
1146     import std.range.primitives : ElementType, isInputRange, isOutputRange;
1147     static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry));
1148     static assert(isOutputRange!(Uring, SubmissionEntry));
1149 }