1 /**
2  * Simple idiomatic dlang wrapper around linux io_uring
3  * (see: https://kernel.dk/io_uring.pdf) asynchronous API.
4  */
5 module during;
6 
7 version (linux) {}
8 else static assert(0, "io_uring is available on linux only");
9 
10 public import during.io_uring;
11 import during.openat2;
12 
13 import core.atomic : MemoryOrder;
14 debug import core.stdc.stdio;
15 import core.stdc.stdlib;
16 import core.sys.linux.epoll;
17 import core.sys.linux.errno;
18 import core.sys.linux.sys.mman;
19 import core.sys.linux.unistd;
20 import core.sys.posix.signal;
21 import core.sys.posix.sys.socket;
22 import core.sys.posix.sys.uio;
23 import std.algorithm.comparison : among;
24 
25 nothrow @nogc:
26 
27 /**
28  * Setup new instance of io_uring into provided `Uring` structure.
29  *
30  * Params:
31  *     uring = `Uring` structure to be initialized (must not be already initialized)
32  *     entries = Number of entries to initialize uring with
33  *     flags = `SetupFlags` to use to initialize uring.
34  *
35  * Returns: On succes it returns 0, `-errno` otherwise.
36  */
37 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE)
38 {
39     assert(uring.payload is null, "Uring is already initialized");
40     uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof);
41     if (uring.payload is null) return -errno;
42 
43     uring.payload.params.flags = flags;
44     uring.payload.refs = 1;
45     auto r = io_uring_setup(entries, uring.payload.params);
46     if (r < 0) return -errno;
47 
48     uring.payload.fd = r;
49 
50     if (uring.payload.mapRings() < 0)
51     {
52         dispose(uring);
53         return -errno;
54     }
55 
56     // debug printf("uring(%d): setup\n", uring.payload.fd);
57 
58     return 0;
59 }
60 
61 /**
62  * Main entry point to work with io_uring.
63  *
64  * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface.
65  * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries.
66  *
67  * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`.
68  *
69  * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for
70  * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry`
71  * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's
72  * reused for multiple operations.
73  */
74 struct Uring
75 {
76     nothrow @nogc:
77 
78     private UringDesc* payload;
79     private void checkInitialized() const
80     {
81         assert(payload !is null, "Uring hasn't been initialized yet");
82     }
83 
84     /// Copy constructor
85     this(ref return scope Uring rhs)
86     {
87         assert(rhs.payload !is null, "rhs payload is null");
88         // debug printf("uring(%d): copy\n", rhs.payload.fd);
89         this.payload = rhs.payload;
90         this.payload.refs++;
91     }
92 
93     /// Destructor
94     ~this()
95     {
96         dispose(this);
97     }
98 
99     /// Native io_uring file descriptor
100     auto fd() const
101     {
102         checkInitialized();
103         return payload.fd;
104     }
105 
106     /// io_uring parameters
107     SetupParameters params() const return
108     {
109         checkInitialized();
110         return payload.params;
111     }
112 
113     /// Check if there is some `CompletionEntry` to process.
114     bool empty() const
115     {
116         checkInitialized();
117         return payload.cq.empty;
118     }
119 
120     /// Check if there is space for another `SubmissionEntry` to submit.
121     bool full() const
122     {
123         checkInitialized();
124         return payload.sq.full;
125     }
126 
127     /// Available space in submission queue before it becomes full
128     size_t capacity() const
129     {
130         checkInitialized();
131         return payload.sq.capacity;
132     }
133 
134     /// Number of entries in completion queue
135     size_t length() const
136     {
137         checkInitialized();
138         return payload.cq.length;
139     }
140 
141     /// Get first `CompletionEntry` from cq ring
142     ref CompletionEntry front() return
143     {
144         checkInitialized();
145         return payload.cq.front;
146     }
147 
148     /// Move to next `CompletionEntry`
149     void popFront()
150     {
151         checkInitialized();
152         return payload.cq.popFront;
153     }
154 
155     /**
156      * Adds new entry to the `SubmissionQueue`.
157      *
158      * Note that this just adds entry to the queue and doesn't advance the tail
159      * marker kernel sees. For that `finishSq()` is needed to be called next.
160      *
161      * Also note that to actually enter new entries to kernel,
162      * it's needed to call `submit()`.
163      *
164      * Params:
165      *     FN    = Function to fill next entry in queue by `ref` (should be faster).
166      *             It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`.
167      *             Note that in this case queue entry is cleaned first before function is called.
168      *     entry = Custom built `SubmissionEntry` to be posted as is.
169      *             Note that in this case it is copied whole over one in the `SubmissionQueue`.
170      *     args  = Optional arguments passed to the function
171      *
172      * Returns: reference to `Uring` structure so it's possible to chain multiple commands.
173      */
174     ref Uring put()(auto ref SubmissionEntry entry) return
175     {
176         checkInitialized();
177         payload.sq.put(entry);
178         return this;
179     }
180 
181     /// ditto
182     ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return
183     {
184         import std.functional : forward;
185         checkInitialized();
186         payload.sq.putWith!FN(forward!args);
187         return this;
188     }
189 
190     /**
191      * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled
192      * to next `SubmissionEntry` in queue.
193      *
194      * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied.
195      *
196      * Params:
197      *   op = Custom operation definition.
198      * Returns:
199      */
200     ref Uring put(OP)(auto ref OP op) return
201         if (!is(OP == SubmissionEntry))
202     {
203         checkInitialized();
204         payload.sq.put(op);
205         return this;
206     }
207 
208     /**
209      * If completion queue is full, the new event maybe dropped.
210      * This value records number of dropped events.
211      */
212     uint overflow() const
213     {
214         checkInitialized();
215         return payload.cq.overflow;
216     }
217 
218     /// Counter of invalid submissions (out-of-bound index in submission array)
219     uint dropped() const
220     {
221         checkInitialized();
222         return payload.sq.dropped;
223     }
224 
225     /**
226      * Submits qued `SubmissionEntry` to be processed by kernel.
227      *
228      * Params:
229      *     want  = number of `CompletionEntries` to wait for.
230      *             If 0, this just submits queued entries and returns.
231      *             If > 0, it blocks until at least wanted number of entries were completed.
232      *     sig   = See io_uring_enter(2) man page
233      *
234      * Returns: Number of submitted entries on success, `-errno` on error
235      */
236     auto submit(uint want = 0, const sigset_t* sig = null) @trusted
237     {
238         checkInitialized();
239 
240         auto len = cast(uint)payload.sq.length;
241         if (len > 0) // anything to submit?
242         {
243             EnterFlags flags;
244             if (want > 0) flags |= EnterFlags.GETEVENTS;
245 
246             payload.sq.flushTail(); // advance queue index
247 
248             if (payload.params.flags & SetupFlags.SQPOLL)
249             {
250                 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP)
251                     flags |= EnterFlags.SQ_WAKEUP;
252                 else if (want == 0) return len; // fast poll
253             }
254             auto r = io_uring_enter(payload.fd, len, want, flags, sig);
255             if (r < 0) return -errno;
256             return r;
257         }
258         else if (want > 0) return wait(want); // just simple wait
259         return 0;
260     }
261 
262     /**
263      * Simmilar to `submit` but with this method we just wait for required number
264      * of `CompletionEntries`.
265      *
266      * Returns: `0` on success, `-errno` on error
267      */
268     auto wait(uint want = 1, const sigset_t* sig = null) @trusted
269     {
270         pragma(inline);
271         checkInitialized();
272         assert(want > 0, "Invalid want value");
273 
274         if (payload.cq.length >= want) return 0; // we don't need to syscall
275 
276         auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig);
277         if (r < 0) return -errno;
278         return 0;
279     }
280 
281     /**
282      * Register single buffer to be mapped into the kernel for faster buffered operations.
283      *
284      * To use the buffers, the application must specify the fixed variants for of operations,
285      * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set
286      * in entry extra data.
287      *
288      * An application can increase or decrease the size or number of registered buffers by first
289      * unregistering the existing buffers, and then issuing a new call to io_uring_register() with
290      * the new buffers.
291      *
292      * Params:
293      *   buffer = Buffers to be registered
294      *
295      * Returns: On success, returns 0. On error, `-errno` is returned.
296      */
297     auto registerBuffers(T)(T buffers)
298         if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else?
299     {
300         checkInitialized();
301         assert(buffers.length, "Empty buffer");
302 
303         if (payload.regBuffers !is null)
304             return -EBUSY; // buffers were already registered
305 
306         static if (is(T == ubyte[]))
307         {
308             auto p = malloc(iovec.sizeof);
309             if (p is null) return -errno;
310             payload.regBuffers = (cast(iovec*)p)[0..1];
311             payload.regBuffers[0].iov_base = cast(void*)&buffers[0];
312             payload.regBuffers[0].iov_len = buffers.length;
313         }
314         else static if (is(T == ubyte[][]))
315         {
316             auto p = malloc(buffers.length * iovec.sizeof);
317             if (p is null) return -errno;
318             payload.regBuffers = (cast(iovec*)p)[0..buffers.length];
319 
320             foreach (i, b; buffers)
321             {
322                 assert(b.length, "Empty buffer");
323                 payload.regBuffers[i].iov_base = cast(void*)&b[0];
324                 payload.regBuffers[i].iov_len = b.length;
325             }
326         }
327 
328         auto r = io_uring_register(
329                 payload.fd,
330                 RegisterOpCode.REGISTER_BUFFERS,
331                 cast(const(void)*)&payload.regBuffers[0], 1
332             );
333 
334         if (r < 0) return -errno;
335         return 0;
336     }
337 
338     /**
339      * Releases all previously registered buffers associated with the `io_uring` instance.
340      *
341      * An application need not unregister buffers explicitly before shutting down the io_uring instance.
342      *
343      * Returns: On success, returns 0. On error, `-errno` is returned.
344      */
345     auto unregisterBuffers() @trusted
346     {
347         checkInitialized();
348 
349         if (payload.regBuffers is null)
350             return -ENXIO; // no buffers were registered
351 
352         free(cast(void*)&payload.regBuffers[0]);
353         payload.regBuffers = null;
354 
355         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0);
356         if (r < 0) return -errno;
357         return 0;
358     }
359 
360     /**
361      * Register files for I/O.
362      *
363      * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags
364      * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the
365      * file descriptor array.
366      *
367      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
368      * need only unregister if it wishes to register a new set of fds.
369      *
370      * Use `-1` as a file descriptor to mark it as reserved in the array.*
371      * Params: fds = array of file descriptors to be registered
372      *
373      * Returns: On success, returns 0. On error, `-errno` is returned.
374      */
375     auto registerFiles(const(int)[] fds)
376     {
377         checkInitialized();
378         assert(fds.length, "No file descriptors provided");
379         assert(fds.length < uint.max, "Too many file descriptors");
380 
381         // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers).
382         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length);
383         if (r < 0) return -errno;
384         return 0;
385     }
386 
387     /*
388      * Register an update for an existing file set. The updates will start at
389      * `off` in the original array.
390      *
391      * Use `-1` as a file descriptor to mark it as reserved in the array.
392      *
393      * Params:
394      *      off = offset to the original registered files to be updated
395      *      files = array of file descriptors to update with
396      *
397      * Returns: number of files updated on success, -errno on failure.
398      */
399     auto registerFilesUpdate(uint off, const(int)[] fds) @trusted
400     {
401         struct Update
402         {
403             uint offset;
404             uint _resv;
405             ulong pfds;
406         }
407 
408         static assert (Update.sizeof == 16);
409 
410         checkInitialized();
411         assert(fds.length, "No file descriptors provided to update");
412         assert(fds.length < uint.max, "Too many file descriptors");
413 
414         Update u = { offset: off, pfds: cast(ulong)&fds[0] };
415         auto r = io_uring_register(
416             payload.fd,
417             RegisterOpCode.REGISTER_FILES_UPDATE,
418             &u, cast(uint)fds.length);
419         if (r < 0) return -errno;
420         return 0;
421     }
422 
423     /**
424      * All previously registered files associated with the `io_uring` instance will be unregistered.
425      *
426      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
427      * need only unregister if it wishes to register a new set of fds.
428      *
429      * Returns: On success, returns 0. On error, `-errno` is returned.
430      */
431     auto unregisterFiles() @trusted
432     {
433         checkInitialized();
434         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0);
435         if (r < 0) return -errno;
436         return 0;
437     }
438 
439     /**
440      * Registers event file descriptor that would be used as a notification mechanism on completion
441      * queue change.
442      *
443      * Params: eventFD = event filedescriptor to be notified about change
444      *
445      * Returns: On success, returns 0. On error, `-errno` is returned.
446      */
447     auto registerEventFD(int eventFD) @trusted
448     {
449         checkInitialized();
450         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1);
451         if (r < 0) return -errno;
452         return 0;
453     }
454 
455     /**
456      * Unregister previously registered notification event file descriptor.
457      *
458      * Returns: On success, returns 0. On error, `-errno` is returned.
459      */
460     auto unregisterEventFD() @trusted
461     {
462         checkInitialized();
463         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0);
464         if (r < 0) return -errno;
465         return 0;
466     }
467 }
468 
469 /**
470  * Uses custom operation definition to fill fields of `SubmissionEntry`.
471  * Can be used in cases, when builtin prep* functions aren't enough.
472  *
473  * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work.
474  *
475  * Note: This doesn't touch previous state of the entry, just fills the corresponding fields.
476  *       So it might be needed to call `clear` first on the entry (depends on usage).
477  *
478  * Params:
479  *   entry = entry to set parameters to
480  *   op = operation to fill entry with (can be custom type)
481  */
482 void fill(E)(ref SubmissionEntry entry, auto ref E op)
483 {
484     pragma(inline);
485     import std.traits : hasMember, FieldNameTuple;
486 
487     // fill entry from provided operation fields (they must have same name as in SubmissionEntry)
488     foreach (m; FieldNameTuple!E)
489     {
490         static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m);
491         __traits(getMember, entry, m) = __traits(getMember, op, m);
492     }
493 }
494 
495 /**
496  * Template function to help set `SubmissionEntry` `user_data` field.
497  *
498  * Params:
499  *      entry = `SubmissionEntry` to prepare
500  *      data = data to set to the `SubmissionEntry`
501  *
502  * Note: data are passed by ref and must live during whole operation.
503  */
504 void setUserData(D)(ref SubmissionEntry entry, ref D data)
505 {
506     entry.user_data = cast(ulong)(cast(void*)&data);
507 }
508 
509 private void prepRW(ref SubmissionEntry entry, Operation op,
510     int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe
511 {
512     pragma(inline, true);
513     entry.opcode = op;
514     entry.fd = fd;
515     entry.off = offset;
516     entry.flags = SubmissionEntryFlags.NONE;
517     entry.ioprio = 0;
518     entry.addr = cast(ulong)addr;
519     entry.len = len;
520     entry.rw_flags = ReadWriteFlags.NONE;
521 	entry.user_data = 0;
522     entry.__pad2[0] = entry.__pad2[1] = entry.__pad2[2] = 0;
523 }
524 
525 /**
526  * Prepares `nop` operation.
527  *
528  * Params:
529  *      entry = `SubmissionEntry` to prepare
530  */
531 void prepNop(ref SubmissionEntry entry) @safe
532 {
533     entry.prepRW(Operation.NOP);
534 }
535 
536 /**
537  * Prepares `readv` operation.
538  *
539  * Params:
540  *      entry = `SubmissionEntry` to prepare
541  *      fd = file descriptor of file we are operating on
542  *      offset = offset
543  *      buffer = iovec buffers to be used by the operation
544  */
545 void prepReadv(V)(ref SubmissionEntry entry, int fd, ref const V buffer, long offset)
546     if (is(V == iovec[]) || is(V == iovec))
547 {
548     static if (is(V == iovec[]))
549     {
550         assert(buffer.length, "Empty buffer");
551         assert(buffer.length < uint.max, "Too many iovec buffers");
552         entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
553     }
554     else entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset);
555 }
556 
557 /**
558  * Prepares `writev` operation.
559  *
560  * Params:
561  *      entry = `SubmissionEntry` to prepare
562  *      fd = file descriptor of file we are operating on
563  *      offset = offset
564  *      buffer = iovec buffers to be used by the operation
565  */
566 void prepWritev(V)(ref SubmissionEntry entry, int fd, ref const V buffer, long offset)
567     if (is(V == iovec[]) || is(V == iovec))
568 {
569     static if (is(V == iovec[]))
570     {
571         assert(buffer.length, "Empty buffer");
572         assert(buffer.length < uint.max, "Too many iovec buffers");
573         entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
574     }
575     else entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset);
576 }
577 
578 /**
579  * Prepares `read_fixed` operation.
580  *
581  * Params:
582  *      entry = `SubmissionEntry` to prepare
583  *      fd = file descriptor of file we are operating on
584  *      offset = offset
585  *      buffer = slice to preregistered buffer
586  *      bufferIndex = index to the preregistered buffers array buffer belongs to
587  */
588 void prepReadFixed(ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex)
589 {
590     assert(buffer.length, "Empty buffer");
591     assert(buffer.length < uint.max, "Buffer too large");
592     entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
593     entry.buf_index = bufferIndex;
594 }
595 
596 /**
597  * Prepares `write_fixed` operation.
598  *
599  * Params:
600  *      entry = `SubmissionEntry` to prepare
601  *      fd = file descriptor of file we are operating on
602  *      offset = offset
603  *      buffer = slice to preregistered buffer
604  *      bufferIndex = index to the preregistered buffers array buffer belongs to
605  */
606 void prepWriteFixed(ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex)
607 {
608     assert(buffer.length, "Empty buffer");
609     assert(buffer.length < uint.max, "Buffer too large");
610     entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
611     entry.buf_index = bufferIndex;
612 }
613 
614 /**
615  * Prepares `recvmsg(2)` operation.
616  *
617  * Params:
618  *      entry = `SubmissionEntry` to prepare
619  *      fd = file descriptor of file we are operating on
620  *      msg = message to operate with
621  *      flags = `recvmsg` operation flags
622  *
623  * Note: Available from Linux 5.3
624  *
625  * See_Also: `recvmsg(2)` man page for details.
626  */
627 void prepRecvMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE)
628 {
629     entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0);
630     entry.msg_flags = flags;
631 }
632 
633 /**
634  * Prepares `sendmsg(2)` operation.
635  *
636  * Params:
637  *      entry = `SubmissionEntry` to prepare
638  *      fd = file descriptor of file we are operating on
639  *      msg = message to operate with
640  *      flags = `sendmsg` operation flags
641  *
642  * Note: Available from Linux 5.3
643  *
644  * See_Also: `sendmsg(2)` man page for details.
645  */
646 void prepSendMsg(ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE)
647 {
648     entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0);
649     entry.msg_flags = flags;
650 }
651 
652 /**
653  * Prepares `fsync` operation.
654  *
655  * Params:
656  *      entry = `SubmissionEntry` to prepare
657  *      fd = file descriptor of a file to call `fsync` on
658  *      flags = `fsync` operation flags
659  */
660 void prepFsync(ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe
661 {
662     entry.prepRW(Operation.FSYNC, fd);
663     entry.fsync_flags = flags;
664 }
665 
666 /**
667  * Poll the fd specified in the submission queue entry for the events specified in the poll_events
668  * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode.
669  * That is, once the poll operation is completed, it will have to be resubmitted.
670  *
671  * Params:
672  *      entry = `SubmissionEntry` to prepare
673  *      fd = file descriptor to poll
674  *      events = events to poll on the FD
675  */
676 void prepPollAdd(ref SubmissionEntry entry, int fd, PollEvents events) @safe
677 {
678     import std.system : endian, Endian;
679 
680     entry.prepRW(Operation.POLL_ADD, fd);
681     static if (endian == Endian.bigEndian)
682         entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16;
683     else
684         entry.poll_events32 = events;
685 }
686 
687 /**
688  * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain
689  * `0`.  If not found, res will contain `-ENOENT`.
690  *
691  * Params:
692  *      entry = `SubmissionEntry` to prepare
693  *      userData = data with the previously issued poll operation
694  */
695 void prepPollRemove(D)(ref SubmissionEntry entry, ref D userData)
696 {
697     entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData);
698 }
699 
700 /**
701  * Prepares `sync_file_range(2)` operation.
702  *
703  * Sync a file segment with disk, permits fine control when synchronizing the open file referred to
704  * by the file descriptor fd with disk.
705  *
706  * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized.
707  *
708  * Params:
709  *      entry = `SubmissionEntry` to prepare
710  *      fd = is the file descriptor to sync
711  *      offset = the starting byte of the file range to be synchronized
712  *      len = the length of the range to be synchronized, in bytes
713  *      flags = the flags for the command.
714  *
715  * See_Also: `sync_file_range(2)` for the general description of the related system call.
716  *
717  * Note: available from Linux 5.2
718  */
719 void prepSyncFileRange(ref SubmissionEntry entry, int fd, ulong offset, uint len,
720     SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe
721 {
722     entry.opcode = Operation.SYNC_FILE_RANGE;
723     entry.fd = fd;
724     entry.off = offset;
725     entry.len = len;
726     entry.sync_range_flags = flags;
727 }
728 
729 /**
730  * This command will register a timeout operation.
731  *
732  * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A
733  * timeout condition is met when either the specified timeout expires, or the specified number of
734  * events have completed. Either condition will trigger the event. The request will complete with
735  * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got
736  * completed through requests completing on their own. If the timeout was cancelled before it
737  * expired, the request will complete with `-ECANCELED`.
738  *
739  * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation.
740  *
741  * Params:
742  *      entry = `SubmissionEntry` to prepare
743  *      time = reference to `time64` data structure
744  *      count = completion event count
745  *      flags = define if it's a relative or absolute time
746  *
747  * Note: Available from Linux 5.4
748  */
749 void prepTimeout(ref SubmissionEntry entry, ref KernelTimespec time,
750     ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL)
751 {
752     entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count);
753     entry.timeout_flags = flags;
754 }
755 
756 /**
757  * Prepares operations to remove existing timeout registered using `TIMEOUT`operation.
758  *
759  * Attempt to remove an existing timeout operation. If the specified timeout request is found and
760  * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the
761  * timeout request was found but expiration was already in progress, this request will terminate
762  * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will
763  * terminate with a result value of `-ENOENT`.
764  *
765  * Params:
766  *      entry = `SubmissionEntry` to prepare
767  *      userData = user data provided with the previously issued timeout operation
768  *
769  * Note: Available from Linux 5.5
770  */
771 void prepTimeoutRemove(D)(ref SubmissionEntry entry, ref D userData)
772 {
773     entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData);
774 }
775 
776 /**
777  * Prepares `accept4(2)` operation.
778  *
779  * See_Also: `accept4(2)`` for the general description of the related system call.
780  *
781  * Params:
782  *      entry = `SubmissionEntry` to prepare
783  *      fd = socket file descriptor
784  *      addr = reference to one of sockaddr structires to be filled with accepted client address
785  *      addrlen = reference to addrlen field that would be filled with accepted client address length
786  *
787  * Note: Available from Linux 5.5
788  */
789 void prepAccept(ADDR)(ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
790     AcceptFlags flags = AcceptFlags.NONE)
791 {
792     entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen));
793     entry.accept_flags = flags;
794 }
795 
796 /**
797  * Prepares operation that cancels existing async work.
798  *
799  * This works with any read/write request, accept,send/recvmsg, etc. There’s an important
800  * distinction to make here with the different kinds of commands. A read/write on a regular file
801  * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore
802  * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel
803  * these operations if they haven’t yet been started. If they have been started, cancellations on
804  * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled
805  * at any time. The completion event for this request will have a result of 0 if done successfully,
806  * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request
807  * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may
808  * not cause this request to be stopped sooner. For blocking IO, the original request will complete
809  * as it originally would have. For IO that is cancellable, it will terminate sooner if at all
810  * possible.
811  *
812  * Params:
813  *      entry = `SubmissionEntry` to prepare
814  *      userData = `user_data` field of the request that should be cancelled
815  *
816  * Note: Available from Linux 5.5
817  */
818 void prepCancel(D)(ref SubmissionEntry entry, ref D userData, uint flags = 0)
819 {
820     entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData);
821     entry.cancel_flags = flags;
822 }
823 
824 /**
825  * Prepares linked timeout operation.
826  *
827  * This request must be linked with another request through `IOSQE_IO_LINK` which is described below.
828  * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion
829  * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no
830  * completion event count as it's tied to a specific request. If used, the timeout specified in the
831  * command will cancel the linked command, unless the linked command completes before the
832  * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was
833  * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked
834  * request.
835  *
836  * Note: Available from Linux 5.5
837  *
838  * Params:
839  *      entry = `SubmissionEntry` to prepare
840  *      time = time specification
841  *      flags = define if it's a relative or absolute time
842  */
843 void prepLinkTimeout(ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL)
844 {
845     entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0);
846     entry.timeout_flags = flags;
847 }
848 
849 /**
850  * Note: Available from Linux 5.5
851  */
852 void prepConnect(ADDR)(ref SubmissionEntry entry, int fd, ref const(ADDR) addr)
853 {
854     entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof);
855 }
856 
857 /**
858  * Note: Available from Linux 5.6
859  */
860 void prepFilesUpdate(ref SubmissionEntry entry, int[] fds, int offset)
861 {
862     entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset);
863 }
864 
865 /**
866  * Note: Available from Linux 5.6
867  */
868 void prepFallocate(ref SubmissionEntry entry, int fd, int mode, long offset, long len)
869 {
870     entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset);
871 }
872 
873 /**
874  * Note: Available from Linux 5.6
875  */
876 void prepOpenat(ref SubmissionEntry entry, int fd, const char* path, int flags, uint mode)
877 {
878     entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0);
879     entry.open_flags = flags;
880 }
881 
882 /**
883  * Note: Available from Linux 5.6
884  */
885 void prepClose(ref SubmissionEntry entry, int fd)
886 {
887     entry.prepRW(Operation.CLOSE, fd);
888 }
889 
890 /**
891  * Note: Available from Linux 5.6
892  */
893 void prepRead(ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset)
894 {
895     entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
896 }
897 
898 /**
899  * Note: Available from Linux 5.6
900  */
901 void prepWrite(ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset)
902 {
903     entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
904 }
905 
906 /**
907  * Note: Available from Linux 5.6
908  */
909 void prepStatx(Statx)(ref SubmissionEntry entry, int fd, const char* path, int flags, uint mask, ref Statx statxbuf)
910 {
911     entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf));
912     entry.statx_flags = flags;
913 }
914 
915 /**
916  * Note: Available from Linux 5.6
917  */
918 void prepFadvise(ref SubmissionEntry entry, int fd, long offset, uint len, int advice)
919 {
920     entry.prepRW(Operation.FADVISE, fd, null, len, offset);
921     entry.fadvise_advice = advice;
922 }
923 
924 /**
925  * Note: Available from Linux 5.6
926  */
927 void prepMadvise(ref SubmissionEntry entry, const(ubyte)[] block, int advice)
928 {
929     entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0);
930     entry.fadvise_advice = advice;
931 }
932 
933 /**
934  * Note: Available from Linux 5.6
935  */
936 void prepSend(ref SubmissionEntry entry, int sockfd, const(ubyte)[] buf, MsgFlags flags)
937 {
938     entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
939     entry.msg_flags = flags;
940 }
941 
942 /**
943  * Note: Available from Linux 5.6
944  */
945 void prepRecv(ref SubmissionEntry entry, int sockfd, ubyte[] buf, MsgFlags flags)
946 {
947     entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
948     entry.msg_flags = flags;
949 }
950 
951 /**
952  * Note: Available from Linux 5.6
953  */
954 void prepOpenat2(ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how)
955 {
956     entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how));
957 }
958 
959 /**
960  * Note: Available from Linux 5.6
961  */
962 void prepEpollCtl(ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev)
963 {
964     entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd);
965 }
966 
967 /**
968  * Note: Available from Linux 5.7
969  */
970 void prepSplice(ref SubmissionEntry entry,
971     int fd_in, ulong off_in,
972     int fd_out, ulong off_out,
973     uint nbytes, uint splice_flags)
974 {
975     entry.prepRW(Operation.SPLICE, fd_out, null, nbytes, off_out);
976     entry.splice_off_in = off_in;
977     entry.splice_fd_in = fd_in;
978     entry.splice_flags = splice_flags;
979 }
980 
981 /**
982  * Note: Available from Linux 5.7
983  */
984 void prepProvideBuffers(ref SubmissionEntry entry, ubyte[] buf, int nr, ushort bgid, int bid)
985 {
986     entry.prepRW(Operation.PROVIDE_BUFFERS, nr, cast(void*)&buf[0], cast(uint)buf.length, bid);
987     entry.buf_group = bgid;
988 }
989 
990 /**
991  * Note: Available from Linux 5.7
992  */
993 void prepRemoveBuffers(ref SubmissionEntry entry, int nr, ushort bgid)
994 {
995     entry.prepRW(Operation.REMOVE_BUFFERS, nr);
996     entry.buf_group = bgid;
997 }
998 
999 /**
1000  * Note: Available from Linux 5.8
1001  */
1002 void prepTee(ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags)
1003 {
1004     entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0);
1005     entry.splice_off_in = 0;
1006     entry.splice_fd_in = fd_in;
1007     entry.splice_flags = flags;
1008 }
1009 
1010 private:
1011 
1012 // uring cleanup
1013 void dispose(ref Uring uring)
1014 {
1015     if (uring.payload is null) return;
1016     // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs);
1017     if (--uring.payload.refs == 0)
1018     {
1019         import std.traits : hasElaborateDestructor;
1020         // debug printf("uring(%d): free\n", uring.payload.fd);
1021         static if (hasElaborateDestructor!UringDesc)
1022             destroy(*uring.payload); // call possible destructors
1023         free(cast(void*)uring.payload);
1024     }
1025     uring.payload = null;
1026 }
1027 
1028 // system fields descriptor
1029 struct UringDesc
1030 {
1031     nothrow @nogc:
1032 
1033     int fd;
1034     size_t refs;
1035     SetupParameters params;
1036     SubmissionQueue sq;
1037     CompletionQueue cq;
1038 
1039     iovec[] regBuffers;
1040 
1041     ~this()
1042     {
1043         if (regBuffers) free(cast(void*)&regBuffers[0]);
1044         if (sq.ring) munmap(sq.ring, sq.ringSize);
1045         if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof);
1046         if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize);
1047         close(fd);
1048     }
1049 
1050     private auto mapRings()
1051     {
1052         sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof;
1053         cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof;
1054 
1055         if (params.features & SetupFeatures.SINGLE_MMAP)
1056         {
1057             if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize;
1058             cq.ringSize = sq.ringSize;
1059         }
1060 
1061         sq.ring = mmap(null, sq.ringSize,
1062             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1063             fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET
1064         );
1065 
1066         if (sq.ring == MAP_FAILED)
1067         {
1068             sq.ring = null;
1069             return -errno;
1070         }
1071 
1072         if (params.features & SetupFeatures.SINGLE_MMAP)
1073             cq.ring = sq.ring;
1074         else
1075         {
1076             cq.ring = mmap(null, cq.ringSize,
1077                 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1078                 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET
1079             );
1080 
1081             if (cq.ring == MAP_FAILED)
1082             {
1083                 cq.ring = null;
1084                 return -errno; // cleanup is done in struct destructors
1085             }
1086         }
1087 
1088         uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries);
1089         sq.khead        = cast(uint*)(sq.ring + params.sq_off.head);
1090         sq.ktail        = cast(uint*)(sq.ring + params.sq_off.tail);
1091         sq.localTail    = *sq.ktail;
1092         sq.ringMask     = *cast(uint*)(sq.ring + params.sq_off.ring_mask);
1093         sq.kflags       = cast(uint*)(sq.ring + params.sq_off.flags);
1094         sq.kdropped     = cast(uint*)(sq.ring + params.sq_off.dropped);
1095 
1096         // Indirection array of indexes to the sqes array (head and tail are pointing to this array).
1097         // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it.
1098         // That way, head and tail are actually indexes to our sqes array.
1099         foreach (i; 0..entries)
1100         {
1101             *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i;
1102         }
1103 
1104         auto psqes = mmap(
1105             null, entries * SubmissionEntry.sizeof,
1106             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1107             fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET
1108         );
1109 
1110         if (psqes == MAP_FAILED) return -errno;
1111         sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries];
1112 
1113         entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries);
1114         cq.khead        = cast(uint*)(cq.ring + params.cq_off.head);
1115         cq.localHead    = *cq.khead;
1116         cq.ktail        = cast(uint*)(cq.ring + params.cq_off.tail);
1117         cq.ringMask     = *cast(uint*)(cq.ring + params.cq_off.ring_mask);
1118         cq.koverflow    = cast(uint*)(cq.ring + params.cq_off.overflow);
1119         cq.cqes         = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries];
1120         cq.kflags       = cast(uint*)(cq.ring + params.cq_off.flags);
1121         return 0;
1122     }
1123 }
1124 
1125 /// Wraper for `SubmissionEntry` queue
1126 struct SubmissionQueue
1127 {
1128     nothrow @nogc:
1129 
1130     // mmaped fields
1131     uint* khead; // controlled by kernel
1132     uint* ktail; // controlled by us
1133     uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP)
1134     uint* kdropped; // counter of invalid submissions (out of bound index)
1135     uint ringMask; // constant mask used to determine array index from head/tail
1136 
1137     // mmap details (for cleanup)
1138     void* ring; // pointer to the mmaped region
1139     size_t ringSize; // size of mmaped memory block
1140 
1141     // mmapped list of entries (fixed length)
1142     SubmissionEntry[] sqes;
1143 
1144     uint localTail; // used for batch submission
1145 
1146     uint head() const { return atomicLoad!(MemoryOrder.acq)(*khead); }
1147     uint tail() const { return localTail; }
1148 
1149     void flushTail()
1150     {
1151         pragma(inline);
1152         // debug printf("SQ updating tail: %d\n", localTail);
1153         atomicStore!(MemoryOrder.rel)(*ktail, localTail);
1154     }
1155 
1156     SubmissionQueueFlags flags() const
1157     {
1158         return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags);
1159     }
1160 
1161     bool full() const { return sqes.length == length; }
1162 
1163     size_t length() const { return tail - head; }
1164 
1165     size_t capacity() const { return sqes.length - length; }
1166 
1167     void put()(auto ref SubmissionEntry entry)
1168     {
1169         assert(!full, "SumbissionQueue is full");
1170         sqes[tail & ringMask] = entry;
1171         localTail++;
1172     }
1173 
1174     void put(OP)(auto ref OP op)
1175         if (!is(OP == SubmissionEntry))
1176     {
1177         assert(!full, "SumbissionQueue is full");
1178         sqes[tail & ringMask].clear();
1179         sqes[tail & ringMask].fill(op);
1180         localTail++;
1181     }
1182 
1183     private void putWith(alias FN, ARGS...)(auto ref ARGS args)
1184     {
1185         import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple;
1186 
1187         static assert(
1188             Parameters!FN.length >= 1
1189             && is(Parameters!FN[0] == SubmissionEntry)
1190             && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_,
1191             "Alias function must accept at least `ref SubmissionEntry`");
1192 
1193         static assert(
1194             is(typeof(FN(sqes[tail & ringMask], args))),
1195             "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof);
1196 
1197         assert(!full, "SumbissionQueue is full");
1198         sqes[tail & ringMask].clear();
1199         FN(sqes[tail & ringMask], args);
1200         localTail++;
1201     }
1202 
1203     uint dropped() const { return atomicLoad!(MemoryOrder.raw)(*kdropped); }
1204 }
1205 
1206 struct CompletionQueue
1207 {
1208     nothrow @nogc:
1209 
1210     // mmaped fields
1211     uint* khead; // controlled by us (increment after entry at head was read)
1212     uint* ktail; // updated by kernel
1213     uint* koverflow;
1214     uint* kflags;
1215     CompletionEntry[] cqes; // array of entries (fixed length)
1216 
1217     uint ringMask; // constant mask used to determine array index from head/tail
1218 
1219     // mmap details (for cleanup)
1220     void* ring;
1221     size_t ringSize;
1222 
1223     uint localHead; // used for bulk reading
1224 
1225     uint head() const { return localHead; }
1226     uint tail() const { return atomicLoad!(MemoryOrder.acq)(*ktail); }
1227 
1228     void flushHead()
1229     {
1230         pragma(inline);
1231         // debug printf("CQ updating head: %d\n", localHead);
1232         atomicStore!(MemoryOrder.rel)(*khead, localHead);
1233     }
1234 
1235     bool empty() const { return head == tail; }
1236 
1237     ref CompletionEntry front() return
1238     {
1239         assert(!empty, "CompletionQueue is empty");
1240         return cqes[localHead & ringMask];
1241     }
1242 
1243     void popFront()
1244     {
1245         pragma(inline);
1246         assert(!empty, "CompletionQueue is empty");
1247         localHead++;
1248         flushHead();
1249     }
1250 
1251     size_t length() const { return tail - localHead; }
1252 
1253     uint overflow() const { return atomicLoad!(MemoryOrder.raw)(*koverflow); }
1254 
1255     /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel.
1256     void flags(CQRingFlags flags) { atomicStore!(MemoryOrder.raw)(*kflags, flags); }
1257 }
1258 
1259 // just a helper to use atomicStore more easily with older compilers
1260 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted
1261 {
1262     pragma(inline, true);
1263     import core.atomic : store = atomicStore;
1264     static if (__VERSION__ >= 2089) store!ms(val, newVal);
1265     else store!ms(*(cast(shared T*)&val), newVal);
1266 }
1267 
1268 // just a helper to use atomicLoad more easily with older compilers
1269 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted
1270 {
1271     pragma(inline, true);
1272     import core.atomic : load = atomicLoad;
1273     static if (__VERSION__ >= 2089) return load!ms(val);
1274     else return load!ms(*(cast(const shared T*)&val));
1275 }
1276 
1277 version (assert)
1278 {
1279     import std.range.primitives : ElementType, isInputRange, isOutputRange;
1280     static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry));
1281     static assert(isOutputRange!(Uring, SubmissionEntry));
1282 }