~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/io_uring.c

Version: ~ [ linux-5.13-rc5 ] ~ [ linux-5.12.9 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.42 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.124 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.193 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.235 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.271 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.271 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.18.140 ] ~ [ linux-3.16.85 ] ~ [ linux-3.14.79 ] ~ [ linux-3.12.74 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0
  2 /*
  3  * Shared application/kernel submission and completion ring pairs, for
  4  * supporting fast/efficient IO.
  5  *
  6  * A note on the read/write ordering memory barriers that are matched between
  7  * the application and kernel side.
  8  *
  9  * After the application reads the CQ ring tail, it must use an
 10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
 11  * before writing the tail (using smp_load_acquire to read the tail will
 12  * do). It also needs a smp_mb() before updating CQ head (ordering the
 13  * entry load(s) with the head store), pairing with an implicit barrier
 14  * through a control-dependency in io_get_cqring (smp_store_release to
 15  * store head will do). Failure to do so could lead to reading invalid
 16  * CQ entries.
 17  *
 18  * Likewise, the application must use an appropriate smp_wmb() before
 19  * writing the SQ tail (ordering SQ entry stores with the tail store),
 20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
 21  * to store the tail will do). And it needs a barrier ordering the SQ
 22  * head load before writing new SQ entries (smp_load_acquire to read
 23  * head will do).
 24  *
 25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
 26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
 27  * updating the SQ tail; a full memory barrier smp_mb() is needed
 28  * between.
 29  *
 30  * Also see the examples in the liburing library:
 31  *
 32  *      git://git.kernel.dk/liburing
 33  *
 34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
 35  * from data shared between the kernel and application. This is done both
 36  * for ordering purposes, but also to ensure that once a value is loaded from
 37  * data that the application could potentially modify, it remains stable.
 38  *
 39  * Copyright (C) 2018-2019 Jens Axboe
 40  * Copyright (c) 2018-2019 Christoph Hellwig
 41  */
 42 #include <linux/kernel.h>
 43 #include <linux/init.h>
 44 #include <linux/errno.h>
 45 #include <linux/syscalls.h>
 46 #include <linux/compat.h>
 47 #include <linux/refcount.h>
 48 #include <linux/uio.h>
 49 
 50 #include <linux/sched/signal.h>
 51 #include <linux/fs.h>
 52 #include <linux/file.h>
 53 #include <linux/fdtable.h>
 54 #include <linux/mm.h>
 55 #include <linux/mman.h>
 56 #include <linux/mmu_context.h>
 57 #include <linux/percpu.h>
 58 #include <linux/slab.h>
 59 #include <linux/workqueue.h>
 60 #include <linux/kthread.h>
 61 #include <linux/blkdev.h>
 62 #include <linux/bvec.h>
 63 #include <linux/net.h>
 64 #include <net/sock.h>
 65 #include <net/af_unix.h>
 66 #include <net/scm.h>
 67 #include <linux/anon_inodes.h>
 68 #include <linux/sched/mm.h>
 69 #include <linux/uaccess.h>
 70 #include <linux/nospec.h>
 71 #include <linux/sizes.h>
 72 #include <linux/hugetlb.h>
 73 
 74 #include <uapi/linux/io_uring.h>
 75 
 76 #include "internal.h"
 77 
 78 #define IORING_MAX_ENTRIES      4096
 79 #define IORING_MAX_FIXED_FILES  1024
 80 
 81 struct io_uring {
 82         u32 head ____cacheline_aligned_in_smp;
 83         u32 tail ____cacheline_aligned_in_smp;
 84 };
 85 
 86 /*
 87  * This data is shared with the application through the mmap at offset
 88  * IORING_OFF_SQ_RING.
 89  *
 90  * The offsets to the member fields are published through struct
 91  * io_sqring_offsets when calling io_uring_setup.
 92  */
 93 struct io_sq_ring {
 94         /*
 95          * Head and tail offsets into the ring; the offsets need to be
 96          * masked to get valid indices.
 97          *
 98          * The kernel controls head and the application controls tail.
 99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144 
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190 
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197 
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202 
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_len;
206 };
207 
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212 
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217 
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 unsigned                cached_sq_dropped;
225                 struct io_uring_sqe     *sq_sqes;
226 
227                 struct list_head        defer_list;
228         } ____cacheline_aligned_in_smp;
229 
230         /* IO offload */
231         struct workqueue_struct *sqo_wq;
232         struct task_struct      *sqo_thread;    /* if using sq thread polling */
233         struct mm_struct        *sqo_mm;
234         wait_queue_head_t       sqo_wait;
235         struct completion       sqo_thread_started;
236 
237         struct {
238                 /* CQ ring */
239                 struct io_cq_ring       *cq_ring;
240                 unsigned                cached_cq_tail;
241                 atomic_t                cached_cq_overflow;
242                 unsigned                cq_entries;
243                 unsigned                cq_mask;
244                 struct wait_queue_head  cq_wait;
245                 struct fasync_struct    *cq_fasync;
246                 struct eventfd_ctx      *cq_ev_fd;
247         } ____cacheline_aligned_in_smp;
248 
249         /*
250          * If used, fixed file set. Writers must ensure that ->refs is dead,
251          * readers must ensure that ->refs is alive as long as the file* is
252          * used. Only updated through io_uring_register(2).
253          */
254         struct file             **user_files;
255         unsigned                nr_user_files;
256 
257         /* if used, fixed mapped user buffers */
258         unsigned                nr_user_bufs;
259         struct io_mapped_ubuf   *user_bufs;
260 
261         struct user_struct      *user;
262 
263         struct cred             *creds;
264 
265         struct completion       ctx_done;
266 
267         struct {
268                 struct mutex            uring_lock;
269                 wait_queue_head_t       wait;
270         } ____cacheline_aligned_in_smp;
271 
272         struct {
273                 spinlock_t              completion_lock;
274                 bool                    poll_multi_file;
275                 /*
276                  * ->poll_list is protected by the ctx->uring_lock for
277                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
278                  * For SQPOLL, only the single threaded io_sq_thread() will
279                  * manipulate the list, hence no extra locking is needed there.
280                  */
281                 struct list_head        poll_list;
282                 struct list_head        cancel_list;
283         } ____cacheline_aligned_in_smp;
284 
285         struct async_list       pending_async[2];
286 
287 #if defined(CONFIG_UNIX)
288         struct socket           *ring_sock;
289 #endif
290 };
291 
292 struct sqe_submit {
293         const struct io_uring_sqe       *sqe;
294         unsigned short                  index;
295         u32                             sequence;
296         bool                            has_user;
297         bool                            needs_lock;
298         bool                            needs_fixed_file;
299 };
300 
301 /*
302  * First field must be the file pointer in all the
303  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
304  */
305 struct io_poll_iocb {
306         struct file                     *file;
307         struct wait_queue_head          *head;
308         __poll_t                        events;
309         bool                            done;
310         bool                            canceled;
311         struct wait_queue_entry         wait;
312 };
313 
314 /*
315  * NOTE! Each of the iocb union members has the file pointer
316  * as the first entry in their struct definition. So you can
317  * access the file pointer through any of the sub-structs,
318  * or directly as just 'ki_filp' in this struct.
319  */
320 struct io_kiocb {
321         union {
322                 struct file             *file;
323                 struct kiocb            rw;
324                 struct io_poll_iocb     poll;
325         };
326 
327         struct sqe_submit       submit;
328 
329         struct io_ring_ctx      *ctx;
330         struct list_head        list;
331         struct list_head        link_list;
332         unsigned int            flags;
333         refcount_t              refs;
334 #define REQ_F_NOWAIT            1       /* must not punt to workers */
335 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
336 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
337 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
338 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
339 #define REQ_F_IO_DRAINED        32      /* drain done */
340 #define REQ_F_LINK              64      /* linked sqes */
341 #define REQ_F_LINK_DONE         128     /* linked sqes done */
342 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
343 #define REQ_F_ISREG             2048    /* regular file */
344 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
345         u64                     user_data;
346         u32                     result;
347         u32                     sequence;
348 
349         struct work_struct      work;
350 };
351 
352 #define IO_PLUG_THRESHOLD               2
353 #define IO_IOPOLL_BATCH                 8
354 
355 struct io_submit_state {
356         struct blk_plug         plug;
357 
358         /*
359          * io_kiocb alloc cache
360          */
361         void                    *reqs[IO_IOPOLL_BATCH];
362         unsigned                int free_reqs;
363         unsigned                int cur_req;
364 
365         /*
366          * File reference cache
367          */
368         struct file             *file;
369         unsigned int            fd;
370         unsigned int            has_refs;
371         unsigned int            used_refs;
372         unsigned int            ios_left;
373 };
374 
375 static void io_sq_wq_submit_work(struct work_struct *work);
376 
377 static struct kmem_cache *req_cachep;
378 
379 static const struct file_operations io_uring_fops;
380 
381 struct sock *io_uring_get_socket(struct file *file)
382 {
383 #if defined(CONFIG_UNIX)
384         if (file->f_op == &io_uring_fops) {
385                 struct io_ring_ctx *ctx = file->private_data;
386 
387                 return ctx->ring_sock->sk;
388         }
389 #endif
390         return NULL;
391 }
392 EXPORT_SYMBOL(io_uring_get_socket);
393 
394 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
395 {
396         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
397 
398         complete(&ctx->ctx_done);
399 }
400 
401 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
402 {
403         struct io_ring_ctx *ctx;
404         int i;
405 
406         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
407         if (!ctx)
408                 return NULL;
409 
410         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
411                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
412                 kfree(ctx);
413                 return NULL;
414         }
415 
416         ctx->flags = p->flags;
417         init_waitqueue_head(&ctx->cq_wait);
418         init_completion(&ctx->ctx_done);
419         init_completion(&ctx->sqo_thread_started);
420         mutex_init(&ctx->uring_lock);
421         init_waitqueue_head(&ctx->wait);
422         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
423                 spin_lock_init(&ctx->pending_async[i].lock);
424                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
425                 atomic_set(&ctx->pending_async[i].cnt, 0);
426         }
427         spin_lock_init(&ctx->completion_lock);
428         INIT_LIST_HEAD(&ctx->poll_list);
429         INIT_LIST_HEAD(&ctx->cancel_list);
430         INIT_LIST_HEAD(&ctx->defer_list);
431         return ctx;
432 }
433 
434 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
435                                      struct io_kiocb *req)
436 {
437         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
438                 return false;
439 
440         return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped
441                                         + atomic_read(&ctx->cached_cq_overflow);
442 }
443 
444 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
445 {
446         struct io_kiocb *req;
447 
448         if (list_empty(&ctx->defer_list))
449                 return NULL;
450 
451         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
452         if (!io_sequence_defer(ctx, req)) {
453                 list_del_init(&req->list);
454                 return req;
455         }
456 
457         return NULL;
458 }
459 
460 static void __io_commit_cqring(struct io_ring_ctx *ctx)
461 {
462         struct io_cq_ring *ring = ctx->cq_ring;
463 
464         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
465                 /* order cqe stores with ring update */
466                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
467 
468                 if (wq_has_sleeper(&ctx->cq_wait)) {
469                         wake_up_interruptible(&ctx->cq_wait);
470                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
471                 }
472         }
473 }
474 
475 static void io_commit_cqring(struct io_ring_ctx *ctx)
476 {
477         struct io_kiocb *req;
478 
479         __io_commit_cqring(ctx);
480 
481         while ((req = io_get_deferred_req(ctx)) != NULL) {
482                 req->flags |= REQ_F_IO_DRAINED;
483                 queue_work(ctx->sqo_wq, &req->work);
484         }
485 }
486 
487 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
488 {
489         struct io_cq_ring *ring = ctx->cq_ring;
490         unsigned tail;
491 
492         tail = ctx->cached_cq_tail;
493         /*
494          * writes to the cq entry need to come after reading head; the
495          * control dependency is enough as we're using WRITE_ONCE to
496          * fill the cq entry
497          */
498         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
499                 return NULL;
500 
501         ctx->cached_cq_tail++;
502         return &ring->cqes[tail & ctx->cq_mask];
503 }
504 
505 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
506                                  long res)
507 {
508         struct io_uring_cqe *cqe;
509 
510         /*
511          * If we can't get a cq entry, userspace overflowed the
512          * submission (by quite a lot). Increment the overflow count in
513          * the ring.
514          */
515         cqe = io_get_cqring(ctx);
516         if (cqe) {
517                 WRITE_ONCE(cqe->user_data, ki_user_data);
518                 WRITE_ONCE(cqe->res, res);
519                 WRITE_ONCE(cqe->flags, 0);
520         } else {
521                 WRITE_ONCE(ctx->cq_ring->overflow,
522                                 atomic_inc_return(&ctx->cached_cq_overflow));
523         }
524 }
525 
526 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
527 {
528         if (waitqueue_active(&ctx->wait))
529                 wake_up(&ctx->wait);
530         if (waitqueue_active(&ctx->sqo_wait))
531                 wake_up(&ctx->sqo_wait);
532         if (ctx->cq_ev_fd)
533                 eventfd_signal(ctx->cq_ev_fd, 1);
534 }
535 
536 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
537                                 long res)
538 {
539         unsigned long flags;
540 
541         spin_lock_irqsave(&ctx->completion_lock, flags);
542         io_cqring_fill_event(ctx, user_data, res);
543         io_commit_cqring(ctx);
544         spin_unlock_irqrestore(&ctx->completion_lock, flags);
545 
546         io_cqring_ev_posted(ctx);
547 }
548 
549 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
550 {
551         percpu_ref_put_many(&ctx->refs, refs);
552 
553         if (waitqueue_active(&ctx->wait))
554                 wake_up(&ctx->wait);
555 }
556 
557 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
558                                    struct io_submit_state *state)
559 {
560         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
561         struct io_kiocb *req;
562 
563         if (!percpu_ref_tryget(&ctx->refs))
564                 return NULL;
565 
566         if (!state) {
567                 req = kmem_cache_alloc(req_cachep, gfp);
568                 if (unlikely(!req))
569                         goto out;
570         } else if (!state->free_reqs) {
571                 size_t sz;
572                 int ret;
573 
574                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
575                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
576 
577                 /*
578                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
579                  * retry single alloc to be on the safe side.
580                  */
581                 if (unlikely(ret <= 0)) {
582                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
583                         if (!state->reqs[0])
584                                 goto out;
585                         ret = 1;
586                 }
587                 state->free_reqs = ret - 1;
588                 state->cur_req = 1;
589                 req = state->reqs[0];
590         } else {
591                 req = state->reqs[state->cur_req];
592                 state->free_reqs--;
593                 state->cur_req++;
594         }
595 
596         req->file = NULL;
597         req->ctx = ctx;
598         req->flags = 0;
599         /* one is dropped after submission, the other at completion */
600         refcount_set(&req->refs, 2);
601         req->result = 0;
602         return req;
603 out:
604         io_ring_drop_ctx_refs(ctx, 1);
605         return NULL;
606 }
607 
608 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
609 {
610         if (*nr) {
611                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
612                 io_ring_drop_ctx_refs(ctx, *nr);
613                 *nr = 0;
614         }
615 }
616 
617 static void __io_free_req(struct io_kiocb *req)
618 {
619         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
620                 fput(req->file);
621         io_ring_drop_ctx_refs(req->ctx, 1);
622         kmem_cache_free(req_cachep, req);
623 }
624 
625 static void io_req_link_next(struct io_kiocb *req)
626 {
627         struct io_kiocb *nxt;
628 
629         /*
630          * The list should never be empty when we are called here. But could
631          * potentially happen if the chain is messed up, check to be on the
632          * safe side.
633          */
634         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
635         if (nxt) {
636                 list_del(&nxt->list);
637                 if (!list_empty(&req->link_list)) {
638                         INIT_LIST_HEAD(&nxt->link_list);
639                         list_splice(&req->link_list, &nxt->link_list);
640                         nxt->flags |= REQ_F_LINK;
641                 }
642 
643                 nxt->flags |= REQ_F_LINK_DONE;
644                 INIT_WORK(&nxt->work, io_sq_wq_submit_work);
645                 queue_work(req->ctx->sqo_wq, &nxt->work);
646         }
647 }
648 
649 /*
650  * Called if REQ_F_LINK is set, and we fail the head request
651  */
652 static void io_fail_links(struct io_kiocb *req)
653 {
654         struct io_kiocb *link;
655 
656         while (!list_empty(&req->link_list)) {
657                 link = list_first_entry(&req->link_list, struct io_kiocb, list);
658                 list_del(&link->list);
659 
660                 io_cqring_add_event(req->ctx, link->user_data, -ECANCELED);
661                 __io_free_req(link);
662         }
663 }
664 
665 static void io_free_req(struct io_kiocb *req)
666 {
667         /*
668          * If LINK is set, we have dependent requests in this chain. If we
669          * didn't fail this request, queue the first one up, moving any other
670          * dependencies to the next request. In case of failure, fail the rest
671          * of the chain.
672          */
673         if (req->flags & REQ_F_LINK) {
674                 if (req->flags & REQ_F_FAIL_LINK)
675                         io_fail_links(req);
676                 else
677                         io_req_link_next(req);
678         }
679 
680         __io_free_req(req);
681 }
682 
683 static void io_put_req(struct io_kiocb *req)
684 {
685         if (refcount_dec_and_test(&req->refs))
686                 io_free_req(req);
687 }
688 
689 static unsigned io_cqring_events(struct io_cq_ring *ring)
690 {
691         /* See comment at the top of this file */
692         smp_rmb();
693         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
694 }
695 
696 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
697 {
698         struct io_sq_ring *ring = ctx->sq_ring;
699 
700         /* make sure SQ entry isn't read before tail */
701         return smp_load_acquire(&ring->r.tail) - ctx->cached_sq_head;
702 }
703 
704 /*
705  * Find and free completed poll iocbs
706  */
707 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
708                                struct list_head *done)
709 {
710         void *reqs[IO_IOPOLL_BATCH];
711         struct io_kiocb *req;
712         int to_free;
713 
714         to_free = 0;
715         while (!list_empty(done)) {
716                 req = list_first_entry(done, struct io_kiocb, list);
717                 list_del(&req->list);
718 
719                 io_cqring_fill_event(ctx, req->user_data, req->result);
720                 (*nr_events)++;
721 
722                 if (refcount_dec_and_test(&req->refs)) {
723                         /* If we're not using fixed files, we have to pair the
724                          * completion part with the file put. Use regular
725                          * completions for those, only batch free for fixed
726                          * file and non-linked commands.
727                          */
728                         if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
729                             REQ_F_FIXED_FILE) {
730                                 reqs[to_free++] = req;
731                                 if (to_free == ARRAY_SIZE(reqs))
732                                         io_free_req_many(ctx, reqs, &to_free);
733                         } else {
734                                 io_free_req(req);
735                         }
736                 }
737         }
738 
739         io_commit_cqring(ctx);
740         io_free_req_many(ctx, reqs, &to_free);
741 }
742 
743 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
744                         long min)
745 {
746         struct io_kiocb *req, *tmp;
747         LIST_HEAD(done);
748         bool spin;
749         int ret;
750 
751         /*
752          * Only spin for completions if we don't have multiple devices hanging
753          * off our complete list, and we're under the requested amount.
754          */
755         spin = !ctx->poll_multi_file && *nr_events < min;
756 
757         ret = 0;
758         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
759                 struct kiocb *kiocb = &req->rw;
760 
761                 /*
762                  * Move completed entries to our local list. If we find a
763                  * request that requires polling, break out and complete
764                  * the done list first, if we have entries there.
765                  */
766                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
767                         list_move_tail(&req->list, &done);
768                         continue;
769                 }
770                 if (!list_empty(&done))
771                         break;
772 
773                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
774                 if (ret < 0)
775                         break;
776 
777                 if (ret && spin)
778                         spin = false;
779                 ret = 0;
780         }
781 
782         if (!list_empty(&done))
783                 io_iopoll_complete(ctx, nr_events, &done);
784 
785         return ret;
786 }
787 
788 /*
789  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
790  * non-spinning poll check - we'll still enter the driver poll loop, but only
791  * as a non-spinning completion check.
792  */
793 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
794                                 long min)
795 {
796         while (!list_empty(&ctx->poll_list) && !need_resched()) {
797                 int ret;
798 
799                 ret = io_do_iopoll(ctx, nr_events, min);
800                 if (ret < 0)
801                         return ret;
802                 if (!min || *nr_events >= min)
803                         return 0;
804         }
805 
806         return 1;
807 }
808 
809 /*
810  * We can't just wait for polled events to come to us, we have to actively
811  * find and complete them.
812  */
813 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
814 {
815         if (!(ctx->flags & IORING_SETUP_IOPOLL))
816                 return;
817 
818         mutex_lock(&ctx->uring_lock);
819         while (!list_empty(&ctx->poll_list)) {
820                 unsigned int nr_events = 0;
821 
822                 io_iopoll_getevents(ctx, &nr_events, 1);
823 
824                 /*
825                  * Ensure we allow local-to-the-cpu processing to take place,
826                  * in this case we need to ensure that we reap all events.
827                  */
828                 cond_resched();
829         }
830         mutex_unlock(&ctx->uring_lock);
831 }
832 
833 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
834                             long min)
835 {
836         int iters = 0, ret = 0;
837 
838         do {
839                 int tmin = 0;
840 
841                 /*
842                  * Don't enter poll loop if we already have events pending.
843                  * If we do, we can potentially be spinning for commands that
844                  * already triggered a CQE (eg in error).
845                  */
846                 if (io_cqring_events(ctx->cq_ring))
847                         break;
848 
849                 /*
850                  * If a submit got punted to a workqueue, we can have the
851                  * application entering polling for a command before it gets
852                  * issued. That app will hold the uring_lock for the duration
853                  * of the poll right here, so we need to take a breather every
854                  * now and then to ensure that the issue has a chance to add
855                  * the poll to the issued list. Otherwise we can spin here
856                  * forever, while the workqueue is stuck trying to acquire the
857                  * very same mutex.
858                  */
859                 if (!(++iters & 7)) {
860                         mutex_unlock(&ctx->uring_lock);
861                         mutex_lock(&ctx->uring_lock);
862                 }
863 
864                 if (*nr_events < min)
865                         tmin = min - *nr_events;
866 
867                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
868                 if (ret <= 0)
869                         break;
870                 ret = 0;
871         } while (min && !*nr_events && !need_resched());
872 
873         return ret;
874 }
875 
876 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
877                            long min)
878 {
879         int ret;
880 
881         /*
882          * We disallow the app entering submit/complete with polling, but we
883          * still need to lock the ring to prevent racing with polled issue
884          * that got punted to a workqueue.
885          */
886         mutex_lock(&ctx->uring_lock);
887         ret = __io_iopoll_check(ctx, nr_events, min);
888         mutex_unlock(&ctx->uring_lock);
889         return ret;
890 }
891 
892 static void kiocb_end_write(struct io_kiocb *req)
893 {
894         /*
895          * Tell lockdep we inherited freeze protection from submission
896          * thread.
897          */
898         if (req->flags & REQ_F_ISREG) {
899                 struct inode *inode = file_inode(req->file);
900 
901                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
902         }
903         file_end_write(req->file);
904 }
905 
906 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
907 {
908         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
909 
910         if (kiocb->ki_flags & IOCB_WRITE)
911                 kiocb_end_write(req);
912 
913         if ((req->flags & REQ_F_LINK) && res != req->result)
914                 req->flags |= REQ_F_FAIL_LINK;
915         io_cqring_add_event(req->ctx, req->user_data, res);
916         io_put_req(req);
917 }
918 
919 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
920 {
921         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
922 
923         if (kiocb->ki_flags & IOCB_WRITE)
924                 kiocb_end_write(req);
925 
926         if ((req->flags & REQ_F_LINK) && res != req->result)
927                 req->flags |= REQ_F_FAIL_LINK;
928         req->result = res;
929         if (res != -EAGAIN)
930                 req->flags |= REQ_F_IOPOLL_COMPLETED;
931 }
932 
933 /*
934  * After the iocb has been issued, it's safe to be found on the poll list.
935  * Adding the kiocb to the list AFTER submission ensures that we don't
936  * find it from a io_iopoll_getevents() thread before the issuer is done
937  * accessing the kiocb cookie.
938  */
939 static void io_iopoll_req_issued(struct io_kiocb *req)
940 {
941         struct io_ring_ctx *ctx = req->ctx;
942 
943         /*
944          * Track whether we have multiple files in our lists. This will impact
945          * how we do polling eventually, not spinning if we're on potentially
946          * different devices.
947          */
948         if (list_empty(&ctx->poll_list)) {
949                 ctx->poll_multi_file = false;
950         } else if (!ctx->poll_multi_file) {
951                 struct io_kiocb *list_req;
952 
953                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
954                                                 list);
955                 if (list_req->rw.ki_filp != req->rw.ki_filp)
956                         ctx->poll_multi_file = true;
957         }
958 
959         /*
960          * For fast devices, IO may have already completed. If it has, add
961          * it to the front so we find it first.
962          */
963         if (req->flags & REQ_F_IOPOLL_COMPLETED)
964                 list_add(&req->list, &ctx->poll_list);
965         else
966                 list_add_tail(&req->list, &ctx->poll_list);
967 }
968 
969 static void io_file_put(struct io_submit_state *state)
970 {
971         if (state->file) {
972                 int diff = state->has_refs - state->used_refs;
973 
974                 if (diff)
975                         fput_many(state->file, diff);
976                 state->file = NULL;
977         }
978 }
979 
980 /*
981  * Get as many references to a file as we have IOs left in this submission,
982  * assuming most submissions are for one file, or at least that each file
983  * has more than one submission.
984  */
985 static struct file *io_file_get(struct io_submit_state *state, int fd)
986 {
987         if (!state)
988                 return fget(fd);
989 
990         if (state->file) {
991                 if (state->fd == fd) {
992                         state->used_refs++;
993                         state->ios_left--;
994                         return state->file;
995                 }
996                 io_file_put(state);
997         }
998         state->file = fget_many(fd, state->ios_left);
999         if (!state->file)
1000                 return NULL;
1001 
1002         state->fd = fd;
1003         state->has_refs = state->ios_left;
1004         state->used_refs = 1;
1005         state->ios_left--;
1006         return state->file;
1007 }
1008 
1009 /*
1010  * If we tracked the file through the SCM inflight mechanism, we could support
1011  * any file. For now, just ensure that anything potentially problematic is done
1012  * inline.
1013  */
1014 static bool io_file_supports_async(struct file *file)
1015 {
1016         umode_t mode = file_inode(file)->i_mode;
1017 
1018         if (S_ISBLK(mode) || S_ISCHR(mode))
1019                 return true;
1020         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1021                 return true;
1022 
1023         return false;
1024 }
1025 
1026 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
1027                       bool force_nonblock)
1028 {
1029         const struct io_uring_sqe *sqe = s->sqe;
1030         struct io_ring_ctx *ctx = req->ctx;
1031         struct kiocb *kiocb = &req->rw;
1032         unsigned ioprio;
1033         int ret;
1034 
1035         if (!req->file)
1036                 return -EBADF;
1037 
1038         if (S_ISREG(file_inode(req->file)->i_mode))
1039                 req->flags |= REQ_F_ISREG;
1040 
1041         /*
1042          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1043          * we know to async punt it even if it was opened O_NONBLOCK
1044          */
1045         if (force_nonblock && !io_file_supports_async(req->file)) {
1046                 req->flags |= REQ_F_MUST_PUNT;
1047                 return -EAGAIN;
1048         }
1049 
1050         kiocb->ki_pos = READ_ONCE(sqe->off);
1051         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1052         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1053 
1054         ioprio = READ_ONCE(sqe->ioprio);
1055         if (ioprio) {
1056                 ret = ioprio_check_cap(ioprio);
1057                 if (ret)
1058                         return ret;
1059 
1060                 kiocb->ki_ioprio = ioprio;
1061         } else
1062                 kiocb->ki_ioprio = get_current_ioprio();
1063 
1064         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1065         if (unlikely(ret))
1066                 return ret;
1067 
1068         /* don't allow async punt if RWF_NOWAIT was requested */
1069         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1070             (req->file->f_flags & O_NONBLOCK))
1071                 req->flags |= REQ_F_NOWAIT;
1072 
1073         if (force_nonblock)
1074                 kiocb->ki_flags |= IOCB_NOWAIT;
1075 
1076         if (ctx->flags & IORING_SETUP_IOPOLL) {
1077                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1078                     !kiocb->ki_filp->f_op->iopoll)
1079                         return -EOPNOTSUPP;
1080 
1081                 kiocb->ki_flags |= IOCB_HIPRI;
1082                 kiocb->ki_complete = io_complete_rw_iopoll;
1083                 req->result = 0;
1084         } else {
1085                 if (kiocb->ki_flags & IOCB_HIPRI)
1086                         return -EINVAL;
1087                 kiocb->ki_complete = io_complete_rw;
1088         }
1089         return 0;
1090 }
1091 
1092 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1093 {
1094         switch (ret) {
1095         case -EIOCBQUEUED:
1096                 break;
1097         case -ERESTARTSYS:
1098         case -ERESTARTNOINTR:
1099         case -ERESTARTNOHAND:
1100         case -ERESTART_RESTARTBLOCK:
1101                 /*
1102                  * We can't just restart the syscall, since previously
1103                  * submitted sqes may already be in progress. Just fail this
1104                  * IO with EINTR.
1105                  */
1106                 ret = -EINTR;
1107                 /* fall through */
1108         default:
1109                 kiocb->ki_complete(kiocb, ret, 0);
1110         }
1111 }
1112 
1113 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
1114                            const struct io_uring_sqe *sqe,
1115                            struct iov_iter *iter)
1116 {
1117         size_t len = READ_ONCE(sqe->len);
1118         struct io_mapped_ubuf *imu;
1119         unsigned index, buf_index;
1120         size_t offset;
1121         u64 buf_addr;
1122 
1123         /* attempt to use fixed buffers without having provided iovecs */
1124         if (unlikely(!ctx->user_bufs))
1125                 return -EFAULT;
1126 
1127         buf_index = READ_ONCE(sqe->buf_index);
1128         if (unlikely(buf_index >= ctx->nr_user_bufs))
1129                 return -EFAULT;
1130 
1131         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1132         imu = &ctx->user_bufs[index];
1133         buf_addr = READ_ONCE(sqe->addr);
1134 
1135         /* overflow */
1136         if (buf_addr + len < buf_addr)
1137                 return -EFAULT;
1138         /* not inside the mapped region */
1139         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1140                 return -EFAULT;
1141 
1142         /*
1143          * May not be a start of buffer, set size appropriately
1144          * and advance us to the beginning.
1145          */
1146         offset = buf_addr - imu->ubuf;
1147         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1148 
1149         if (offset) {
1150                 /*
1151                  * Don't use iov_iter_advance() here, as it's really slow for
1152                  * using the latter parts of a big fixed buffer - it iterates
1153                  * over each segment manually. We can cheat a bit here, because
1154                  * we know that:
1155                  *
1156                  * 1) it's a BVEC iter, we set it up
1157                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1158                  *    first and last bvec
1159                  *
1160                  * So just find our index, and adjust the iterator afterwards.
1161                  * If the offset is within the first bvec (or the whole first
1162                  * bvec, just use iov_iter_advance(). This makes it easier
1163                  * since we can just skip the first segment, which may not
1164                  * be PAGE_SIZE aligned.
1165                  */
1166                 const struct bio_vec *bvec = imu->bvec;
1167 
1168                 if (offset <= bvec->bv_len) {
1169                         iov_iter_advance(iter, offset);
1170                 } else {
1171                         unsigned long seg_skip;
1172 
1173                         /* skip first vec */
1174                         offset -= bvec->bv_len;
1175                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1176 
1177                         iter->bvec = bvec + seg_skip;
1178                         iter->nr_segs -= seg_skip;
1179                         iter->count -= bvec->bv_len + offset;
1180                         iter->iov_offset = offset & ~PAGE_MASK;
1181                 }
1182         }
1183 
1184         return len;
1185 }
1186 
1187 static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
1188                                const struct sqe_submit *s, struct iovec **iovec,
1189                                struct iov_iter *iter)
1190 {
1191         const struct io_uring_sqe *sqe = s->sqe;
1192         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1193         size_t sqe_len = READ_ONCE(sqe->len);
1194         u8 opcode;
1195 
1196         /*
1197          * We're reading ->opcode for the second time, but the first read
1198          * doesn't care whether it's _FIXED or not, so it doesn't matter
1199          * whether ->opcode changes concurrently. The first read does care
1200          * about whether it is a READ or a WRITE, so we don't trust this read
1201          * for that purpose and instead let the caller pass in the read/write
1202          * flag.
1203          */
1204         opcode = READ_ONCE(sqe->opcode);
1205         if (opcode == IORING_OP_READ_FIXED ||
1206             opcode == IORING_OP_WRITE_FIXED) {
1207                 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
1208                 *iovec = NULL;
1209                 return ret;
1210         }
1211 
1212         if (!s->has_user)
1213                 return -EFAULT;
1214 
1215 #ifdef CONFIG_COMPAT
1216         if (ctx->compat)
1217                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1218                                                 iovec, iter);
1219 #endif
1220 
1221         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1222 }
1223 
1224 /*
1225  * Make a note of the last file/offset/direction we punted to async
1226  * context. We'll use this information to see if we can piggy back a
1227  * sequential request onto the previous one, if it's still hasn't been
1228  * completed by the async worker.
1229  */
1230 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1231 {
1232         struct async_list *async_list = &req->ctx->pending_async[rw];
1233         struct kiocb *kiocb = &req->rw;
1234         struct file *filp = kiocb->ki_filp;
1235         off_t io_end = kiocb->ki_pos + len;
1236 
1237         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1238                 unsigned long max_bytes;
1239 
1240                 /* Use 8x RA size as a decent limiter for both reads/writes */
1241                 max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
1242                 if (!max_bytes)
1243                         max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
1244 
1245                 /* If max len are exceeded, reset the state */
1246                 if (async_list->io_len + len <= max_bytes) {
1247                         req->flags |= REQ_F_SEQ_PREV;
1248                         async_list->io_len += len;
1249                 } else {
1250                         io_end = 0;
1251                         async_list->io_len = 0;
1252                 }
1253         }
1254 
1255         /* New file? Reset state. */
1256         if (async_list->file != filp) {
1257                 async_list->io_len = 0;
1258                 async_list->file = filp;
1259         }
1260         async_list->io_end = io_end;
1261 }
1262 
1263 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1264                    bool force_nonblock)
1265 {
1266         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1267         struct kiocb *kiocb = &req->rw;
1268         struct iov_iter iter;
1269         struct file *file;
1270         size_t iov_count;
1271         ssize_t read_size, ret;
1272 
1273         ret = io_prep_rw(req, s, force_nonblock);
1274         if (ret)
1275                 return ret;
1276         file = kiocb->ki_filp;
1277 
1278         if (unlikely(!(file->f_mode & FMODE_READ)))
1279                 return -EBADF;
1280         if (unlikely(!file->f_op->read_iter))
1281                 return -EINVAL;
1282 
1283         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1284         if (ret < 0)
1285                 return ret;
1286 
1287         read_size = ret;
1288         if (req->flags & REQ_F_LINK)
1289                 req->result = read_size;
1290 
1291         iov_count = iov_iter_count(&iter);
1292         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1293         if (!ret) {
1294                 ssize_t ret2;
1295 
1296                 ret2 = call_read_iter(file, kiocb, &iter);
1297                 /*
1298                  * In case of a short read, punt to async. This can happen
1299                  * if we have data partially cached. Alternatively we can
1300                  * return the short read, in which case the application will
1301                  * need to issue another SQE and wait for it. That SQE will
1302                  * need async punt anyway, so it's more efficient to do it
1303                  * here.
1304                  */
1305                 if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1306                     (req->flags & REQ_F_ISREG) &&
1307                     ret2 > 0 && ret2 < read_size)
1308                         ret2 = -EAGAIN;
1309                 /* Catch -EAGAIN return for forced non-blocking submission */
1310                 if (!force_nonblock || ret2 != -EAGAIN) {
1311                         io_rw_done(kiocb, ret2);
1312                 } else {
1313                         /*
1314                          * If ->needs_lock is true, we're already in async
1315                          * context.
1316                          */
1317                         if (!s->needs_lock)
1318                                 io_async_list_note(READ, req, iov_count);
1319                         ret = -EAGAIN;
1320                 }
1321         }
1322         kfree(iovec);
1323         return ret;
1324 }
1325 
1326 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1327                     bool force_nonblock)
1328 {
1329         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1330         struct kiocb *kiocb = &req->rw;
1331         struct iov_iter iter;
1332         struct file *file;
1333         size_t iov_count;
1334         ssize_t ret;
1335 
1336         ret = io_prep_rw(req, s, force_nonblock);
1337         if (ret)
1338                 return ret;
1339 
1340         file = kiocb->ki_filp;
1341         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1342                 return -EBADF;
1343         if (unlikely(!file->f_op->write_iter))
1344                 return -EINVAL;
1345 
1346         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1347         if (ret < 0)
1348                 return ret;
1349 
1350         if (req->flags & REQ_F_LINK)
1351                 req->result = ret;
1352 
1353         iov_count = iov_iter_count(&iter);
1354 
1355         ret = -EAGAIN;
1356         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1357                 /* If ->needs_lock is true, we're already in async context. */
1358                 if (!s->needs_lock)
1359                         io_async_list_note(WRITE, req, iov_count);
1360                 goto out_free;
1361         }
1362 
1363         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1364         if (!ret) {
1365                 ssize_t ret2;
1366 
1367                 /*
1368                  * Open-code file_start_write here to grab freeze protection,
1369                  * which will be released by another thread in
1370                  * io_complete_rw().  Fool lockdep by telling it the lock got
1371                  * released so that it doesn't complain about the held lock when
1372                  * we return to userspace.
1373                  */
1374                 if (req->flags & REQ_F_ISREG) {
1375                         __sb_start_write(file_inode(file)->i_sb,
1376                                                 SB_FREEZE_WRITE, true);
1377                         __sb_writers_release(file_inode(file)->i_sb,
1378                                                 SB_FREEZE_WRITE);
1379                 }
1380                 kiocb->ki_flags |= IOCB_WRITE;
1381 
1382                 ret2 = call_write_iter(file, kiocb, &iter);
1383                 if (!force_nonblock || ret2 != -EAGAIN) {
1384                         io_rw_done(kiocb, ret2);
1385                 } else {
1386                         /*
1387                          * If ->needs_lock is true, we're already in async
1388                          * context.
1389                          */
1390                         if (!s->needs_lock)
1391                                 io_async_list_note(WRITE, req, iov_count);
1392                         ret = -EAGAIN;
1393                 }
1394         }
1395 out_free:
1396         kfree(iovec);
1397         return ret;
1398 }
1399 
1400 /*
1401  * IORING_OP_NOP just posts a completion event, nothing else.
1402  */
1403 static int io_nop(struct io_kiocb *req, u64 user_data)
1404 {
1405         struct io_ring_ctx *ctx = req->ctx;
1406         long err = 0;
1407 
1408         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1409                 return -EINVAL;
1410 
1411         io_cqring_add_event(ctx, user_data, err);
1412         io_put_req(req);
1413         return 0;
1414 }
1415 
1416 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1417 {
1418         struct io_ring_ctx *ctx = req->ctx;
1419 
1420         if (!req->file)
1421                 return -EBADF;
1422 
1423         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1424                 return -EINVAL;
1425         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1426                 return -EINVAL;
1427 
1428         return 0;
1429 }
1430 
1431 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1432                     bool force_nonblock)
1433 {
1434         loff_t sqe_off = READ_ONCE(sqe->off);
1435         loff_t sqe_len = READ_ONCE(sqe->len);
1436         loff_t end = sqe_off + sqe_len;
1437         unsigned fsync_flags;
1438         int ret;
1439 
1440         fsync_flags = READ_ONCE(sqe->fsync_flags);
1441         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1442                 return -EINVAL;
1443 
1444         ret = io_prep_fsync(req, sqe);
1445         if (ret)
1446                 return ret;
1447 
1448         /* fsync always requires a blocking context */
1449         if (force_nonblock)
1450                 return -EAGAIN;
1451 
1452         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1453                                 end > 0 ? end : LLONG_MAX,
1454                                 fsync_flags & IORING_FSYNC_DATASYNC);
1455 
1456         if (ret < 0 && (req->flags & REQ_F_LINK))
1457                 req->flags |= REQ_F_FAIL_LINK;
1458         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1459         io_put_req(req);
1460         return 0;
1461 }
1462 
1463 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1464 {
1465         struct io_ring_ctx *ctx = req->ctx;
1466         int ret = 0;
1467 
1468         if (!req->file)
1469                 return -EBADF;
1470 
1471         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1472                 return -EINVAL;
1473         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1474                 return -EINVAL;
1475 
1476         return ret;
1477 }
1478 
1479 static int io_sync_file_range(struct io_kiocb *req,
1480                               const struct io_uring_sqe *sqe,
1481                               bool force_nonblock)
1482 {
1483         loff_t sqe_off;
1484         loff_t sqe_len;
1485         unsigned flags;
1486         int ret;
1487 
1488         ret = io_prep_sfr(req, sqe);
1489         if (ret)
1490                 return ret;
1491 
1492         /* sync_file_range always requires a blocking context */
1493         if (force_nonblock)
1494                 return -EAGAIN;
1495 
1496         sqe_off = READ_ONCE(sqe->off);
1497         sqe_len = READ_ONCE(sqe->len);
1498         flags = READ_ONCE(sqe->sync_range_flags);
1499 
1500         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1501 
1502         if (ret < 0 && (req->flags & REQ_F_LINK))
1503                 req->flags |= REQ_F_FAIL_LINK;
1504         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1505         io_put_req(req);
1506         return 0;
1507 }
1508 
1509 #if defined(CONFIG_NET)
1510 static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1511                            bool force_nonblock,
1512                    long (*fn)(struct socket *, struct user_msghdr __user *,
1513                                 unsigned int))
1514 {
1515         struct socket *sock;
1516         int ret;
1517 
1518         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1519                 return -EINVAL;
1520 
1521         sock = sock_from_file(req->file, &ret);
1522         if (sock) {
1523                 struct user_msghdr __user *msg;
1524                 unsigned flags;
1525 
1526                 flags = READ_ONCE(sqe->msg_flags);
1527                 if (flags & MSG_DONTWAIT)
1528                         req->flags |= REQ_F_NOWAIT;
1529                 else if (force_nonblock)
1530                         flags |= MSG_DONTWAIT;
1531 
1532                 msg = (struct user_msghdr __user *) (unsigned long)
1533                         READ_ONCE(sqe->addr);
1534 
1535                 ret = fn(sock, msg, flags);
1536                 if (force_nonblock && ret == -EAGAIN)
1537                         return ret;
1538                 if (ret == -ERESTARTSYS)
1539                         ret = -EINTR;
1540         }
1541 
1542         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1543         io_put_req(req);
1544         return 0;
1545 }
1546 #endif
1547 
1548 static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1549                       bool force_nonblock)
1550 {
1551 #if defined(CONFIG_NET)
1552         return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock);
1553 #else
1554         return -EOPNOTSUPP;
1555 #endif
1556 }
1557 
1558 static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1559                       bool force_nonblock)
1560 {
1561 #if defined(CONFIG_NET)
1562         return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock);
1563 #else
1564         return -EOPNOTSUPP;
1565 #endif
1566 }
1567 
1568 static void io_poll_remove_one(struct io_kiocb *req)
1569 {
1570         struct io_poll_iocb *poll = &req->poll;
1571 
1572         spin_lock(&poll->head->lock);
1573         WRITE_ONCE(poll->canceled, true);
1574         if (!list_empty(&poll->wait.entry)) {
1575                 list_del_init(&poll->wait.entry);
1576                 queue_work(req->ctx->sqo_wq, &req->work);
1577         }
1578         spin_unlock(&poll->head->lock);
1579 
1580         list_del_init(&req->list);
1581 }
1582 
1583 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1584 {
1585         struct io_kiocb *req;
1586 
1587         spin_lock_irq(&ctx->completion_lock);
1588         while (!list_empty(&ctx->cancel_list)) {
1589                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1590                 io_poll_remove_one(req);
1591         }
1592         spin_unlock_irq(&ctx->completion_lock);
1593 }
1594 
1595 /*
1596  * Find a running poll command that matches one specified in sqe->addr,
1597  * and remove it if found.
1598  */
1599 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1600 {
1601         struct io_ring_ctx *ctx = req->ctx;
1602         struct io_kiocb *poll_req, *next;
1603         int ret = -ENOENT;
1604 
1605         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1606                 return -EINVAL;
1607         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1608             sqe->poll_events)
1609                 return -EINVAL;
1610 
1611         spin_lock_irq(&ctx->completion_lock);
1612         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1613                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1614                         io_poll_remove_one(poll_req);
1615                         ret = 0;
1616                         break;
1617                 }
1618         }
1619         spin_unlock_irq(&ctx->completion_lock);
1620 
1621         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1622         io_put_req(req);
1623         return 0;
1624 }
1625 
1626 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1627                              __poll_t mask)
1628 {
1629         req->poll.done = true;
1630         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1631         io_commit_cqring(ctx);
1632 }
1633 
1634 static void io_poll_complete_work(struct work_struct *work)
1635 {
1636         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1637         struct io_poll_iocb *poll = &req->poll;
1638         struct poll_table_struct pt = { ._key = poll->events };
1639         struct io_ring_ctx *ctx = req->ctx;
1640         const struct cred *old_cred;
1641         __poll_t mask = 0;
1642 
1643         old_cred = override_creds(ctx->creds);
1644 
1645         if (!READ_ONCE(poll->canceled))
1646                 mask = vfs_poll(poll->file, &pt) & poll->events;
1647 
1648         /*
1649          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1650          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1651          * synchronize with them.  In the cancellation case the list_del_init
1652          * itself is not actually needed, but harmless so we keep it in to
1653          * avoid further branches in the fast path.
1654          */
1655         spin_lock_irq(&ctx->completion_lock);
1656         if (!mask && !READ_ONCE(poll->canceled)) {
1657                 add_wait_queue(poll->head, &poll->wait);
1658                 spin_unlock_irq(&ctx->completion_lock);
1659                 goto out;
1660         }
1661         list_del_init(&req->list);
1662         io_poll_complete(ctx, req, mask);
1663         spin_unlock_irq(&ctx->completion_lock);
1664 
1665         io_cqring_ev_posted(ctx);
1666         io_put_req(req);
1667 out:
1668         revert_creds(old_cred);
1669 }
1670 
1671 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1672                         void *key)
1673 {
1674         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1675                                                         wait);
1676         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1677         struct io_ring_ctx *ctx = req->ctx;
1678         __poll_t mask = key_to_poll(key);
1679         unsigned long flags;
1680 
1681         /* for instances that support it check for an event match first: */
1682         if (mask && !(mask & poll->events))
1683                 return 0;
1684 
1685         list_del_init(&poll->wait.entry);
1686 
1687         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1688                 list_del(&req->list);
1689                 io_poll_complete(ctx, req, mask);
1690                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1691 
1692                 io_cqring_ev_posted(ctx);
1693                 io_put_req(req);
1694         } else {
1695                 queue_work(ctx->sqo_wq, &req->work);
1696         }
1697 
1698         return 1;
1699 }
1700 
1701 struct io_poll_table {
1702         struct poll_table_struct pt;
1703         struct io_kiocb *req;
1704         int error;
1705 };
1706 
1707 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1708                                struct poll_table_struct *p)
1709 {
1710         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1711 
1712         if (unlikely(pt->req->poll.head)) {
1713                 pt->error = -EINVAL;
1714                 return;
1715         }
1716 
1717         pt->error = 0;
1718         pt->req->poll.head = head;
1719         add_wait_queue(head, &pt->req->poll.wait);
1720 }
1721 
1722 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1723 {
1724         struct io_poll_iocb *poll = &req->poll;
1725         struct io_ring_ctx *ctx = req->ctx;
1726         struct io_poll_table ipt;
1727         bool cancel = false;
1728         __poll_t mask;
1729         u16 events;
1730 
1731         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1732                 return -EINVAL;
1733         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1734                 return -EINVAL;
1735         if (!poll->file)
1736                 return -EBADF;
1737 
1738         INIT_WORK(&req->work, io_poll_complete_work);
1739         events = READ_ONCE(sqe->poll_events);
1740         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1741 
1742         poll->head = NULL;
1743         poll->done = false;
1744         poll->canceled = false;
1745 
1746         ipt.pt._qproc = io_poll_queue_proc;
1747         ipt.pt._key = poll->events;
1748         ipt.req = req;
1749         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1750 
1751         /* initialized the list so that we can do list_empty checks */
1752         INIT_LIST_HEAD(&poll->wait.entry);
1753         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1754 
1755         INIT_LIST_HEAD(&req->list);
1756 
1757         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1758 
1759         spin_lock_irq(&ctx->completion_lock);
1760         if (likely(poll->head)) {
1761                 spin_lock(&poll->head->lock);
1762                 if (unlikely(list_empty(&poll->wait.entry))) {
1763                         if (ipt.error)
1764                                 cancel = true;
1765                         ipt.error = 0;
1766                         mask = 0;
1767                 }
1768                 if (mask || ipt.error)
1769                         list_del_init(&poll->wait.entry);
1770                 else if (cancel)
1771                         WRITE_ONCE(poll->canceled, true);
1772                 else if (!poll->done) /* actually waiting for an event */
1773                         list_add_tail(&req->list, &ctx->cancel_list);
1774                 spin_unlock(&poll->head->lock);
1775         }
1776         if (mask) { /* no async, we'd stolen it */
1777                 ipt.error = 0;
1778                 io_poll_complete(ctx, req, mask);
1779         }
1780         spin_unlock_irq(&ctx->completion_lock);
1781 
1782         if (mask) {
1783                 io_cqring_ev_posted(ctx);
1784                 io_put_req(req);
1785         }
1786         return ipt.error;
1787 }
1788 
1789 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1790                         struct sqe_submit *s)
1791 {
1792         struct io_uring_sqe *sqe_copy;
1793 
1794         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1795                 return 0;
1796 
1797         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1798         if (!sqe_copy)
1799                 return -EAGAIN;
1800 
1801         spin_lock_irq(&ctx->completion_lock);
1802         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1803                 spin_unlock_irq(&ctx->completion_lock);
1804                 kfree(sqe_copy);
1805                 return 0;
1806         }
1807 
1808         memcpy(&req->submit, s, sizeof(*s));
1809         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1810         req->submit.sqe = sqe_copy;
1811 
1812         INIT_WORK(&req->work, io_sq_wq_submit_work);
1813         list_add_tail(&req->list, &ctx->defer_list);
1814         spin_unlock_irq(&ctx->completion_lock);
1815         return -EIOCBQUEUED;
1816 }
1817 
1818 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1819                            const struct sqe_submit *s, bool force_nonblock)
1820 {
1821         int ret, opcode;
1822 
1823         req->user_data = READ_ONCE(s->sqe->user_data);
1824 
1825         if (unlikely(s->index >= ctx->sq_entries))
1826                 return -EINVAL;
1827 
1828         opcode = READ_ONCE(s->sqe->opcode);
1829         switch (opcode) {
1830         case IORING_OP_NOP:
1831                 ret = io_nop(req, req->user_data);
1832                 break;
1833         case IORING_OP_READV:
1834                 if (unlikely(s->sqe->buf_index))
1835                         return -EINVAL;
1836                 ret = io_read(req, s, force_nonblock);
1837                 break;
1838         case IORING_OP_WRITEV:
1839                 if (unlikely(s->sqe->buf_index))
1840                         return -EINVAL;
1841                 ret = io_write(req, s, force_nonblock);
1842                 break;
1843         case IORING_OP_READ_FIXED:
1844                 ret = io_read(req, s, force_nonblock);
1845                 break;
1846         case IORING_OP_WRITE_FIXED:
1847                 ret = io_write(req, s, force_nonblock);
1848                 break;
1849         case IORING_OP_FSYNC:
1850                 ret = io_fsync(req, s->sqe, force_nonblock);
1851                 break;
1852         case IORING_OP_POLL_ADD:
1853                 ret = io_poll_add(req, s->sqe);
1854                 break;
1855         case IORING_OP_POLL_REMOVE:
1856                 ret = io_poll_remove(req, s->sqe);
1857                 break;
1858         case IORING_OP_SYNC_FILE_RANGE:
1859                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1860                 break;
1861         case IORING_OP_SENDMSG:
1862                 ret = io_sendmsg(req, s->sqe, force_nonblock);
1863                 break;
1864         case IORING_OP_RECVMSG:
1865                 ret = io_recvmsg(req, s->sqe, force_nonblock);
1866                 break;
1867         default:
1868                 ret = -EINVAL;
1869                 break;
1870         }
1871 
1872         if (ret)
1873                 return ret;
1874 
1875         if (ctx->flags & IORING_SETUP_IOPOLL) {
1876                 if (req->result == -EAGAIN)
1877                         return -EAGAIN;
1878 
1879                 /* workqueue context doesn't hold uring_lock, grab it now */
1880                 if (s->needs_lock)
1881                         mutex_lock(&ctx->uring_lock);
1882                 io_iopoll_req_issued(req);
1883                 if (s->needs_lock)
1884                         mutex_unlock(&ctx->uring_lock);
1885         }
1886 
1887         return 0;
1888 }
1889 
1890 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1891                                                  const struct io_uring_sqe *sqe)
1892 {
1893         switch (sqe->opcode) {
1894         case IORING_OP_READV:
1895         case IORING_OP_READ_FIXED:
1896                 return &ctx->pending_async[READ];
1897         case IORING_OP_WRITEV:
1898         case IORING_OP_WRITE_FIXED:
1899                 return &ctx->pending_async[WRITE];
1900         default:
1901                 return NULL;
1902         }
1903 }
1904 
1905 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1906 {
1907         u8 opcode = READ_ONCE(sqe->opcode);
1908 
1909         return !(opcode == IORING_OP_READ_FIXED ||
1910                  opcode == IORING_OP_WRITE_FIXED);
1911 }
1912 
1913 static void io_sq_wq_submit_work(struct work_struct *work)
1914 {
1915         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1916         struct io_ring_ctx *ctx = req->ctx;
1917         struct mm_struct *cur_mm = NULL;
1918         struct async_list *async_list;
1919         const struct cred *old_cred;
1920         LIST_HEAD(req_list);
1921         mm_segment_t old_fs;
1922         int ret;
1923 
1924         old_cred = override_creds(ctx->creds);
1925         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1926 restart:
1927         do {
1928                 struct sqe_submit *s = &req->submit;
1929                 const struct io_uring_sqe *sqe = s->sqe;
1930                 unsigned int flags = req->flags;
1931 
1932                 /* Ensure we clear previously set non-block flag */
1933                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1934 
1935                 ret = 0;
1936                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1937                         if (!mmget_not_zero(ctx->sqo_mm)) {
1938                                 ret = -EFAULT;
1939                         } else {
1940                                 cur_mm = ctx->sqo_mm;
1941                                 use_mm(cur_mm);
1942                                 old_fs = get_fs();
1943                                 set_fs(USER_DS);
1944                         }
1945                 }
1946 
1947                 if (!ret) {
1948                         s->has_user = cur_mm != NULL;
1949                         s->needs_lock = true;
1950                         do {
1951                                 ret = __io_submit_sqe(ctx, req, s, false);
1952                                 /*
1953                                  * We can get EAGAIN for polled IO even though
1954                                  * we're forcing a sync submission from here,
1955                                  * since we can't wait for request slots on the
1956                                  * block side.
1957                                  */
1958                                 if (ret != -EAGAIN)
1959                                         break;
1960                                 cond_resched();
1961                         } while (1);
1962                 }
1963 
1964                 /* drop submission reference */
1965                 io_put_req(req);
1966 
1967                 if (ret) {
1968                         io_cqring_add_event(ctx, sqe->user_data, ret);
1969                         io_put_req(req);
1970                 }
1971 
1972                 /* async context always use a copy of the sqe */
1973                 kfree(sqe);
1974 
1975                 /* req from defer and link list needn't decrease async cnt */
1976                 if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
1977                         goto out;
1978 
1979                 if (!async_list)
1980                         break;
1981                 if (!list_empty(&req_list)) {
1982                         req = list_first_entry(&req_list, struct io_kiocb,
1983                                                 list);
1984                         list_del(&req->list);
1985                         continue;
1986                 }
1987                 if (list_empty(&async_list->list))
1988                         break;
1989 
1990                 req = NULL;
1991                 spin_lock(&async_list->lock);
1992                 if (list_empty(&async_list->list)) {
1993                         spin_unlock(&async_list->lock);
1994                         break;
1995                 }
1996                 list_splice_init(&async_list->list, &req_list);
1997                 spin_unlock(&async_list->lock);
1998 
1999                 req = list_first_entry(&req_list, struct io_kiocb, list);
2000                 list_del(&req->list);
2001         } while (req);
2002 
2003         /*
2004          * Rare case of racing with a submitter. If we find the count has
2005          * dropped to zero AND we have pending work items, then restart
2006          * the processing. This is a tiny race window.
2007          */
2008         if (async_list) {
2009                 ret = atomic_dec_return(&async_list->cnt);
2010                 while (!ret && !list_empty(&async_list->list)) {
2011                         spin_lock(&async_list->lock);
2012                         atomic_inc(&async_list->cnt);
2013                         list_splice_init(&async_list->list, &req_list);
2014                         spin_unlock(&async_list->lock);
2015 
2016                         if (!list_empty(&req_list)) {
2017                                 req = list_first_entry(&req_list,
2018                                                         struct io_kiocb, list);
2019                                 list_del(&req->list);
2020                                 goto restart;
2021                         }
2022                         ret = atomic_dec_return(&async_list->cnt);
2023                 }
2024         }
2025 
2026 out:
2027         if (cur_mm) {
2028                 set_fs(old_fs);
2029                 unuse_mm(cur_mm);
2030                 mmput(cur_mm);
2031         }
2032         revert_creds(old_cred);
2033 }
2034 
2035 /*
2036  * See if we can piggy back onto previously submitted work, that is still
2037  * running. We currently only allow this if the new request is sequential
2038  * to the previous one we punted.
2039  */
2040 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
2041 {
2042         bool ret = false;
2043 
2044         if (!list)
2045                 return false;
2046         if (!(req->flags & REQ_F_SEQ_PREV))
2047                 return false;
2048         if (!atomic_read(&list->cnt))
2049                 return false;
2050 
2051         ret = true;
2052         spin_lock(&list->lock);
2053         list_add_tail(&req->list, &list->list);
2054         /*
2055          * Ensure we see a simultaneous modification from io_sq_wq_submit_work()
2056          */
2057         smp_mb();
2058         if (!atomic_read(&list->cnt)) {
2059                 list_del_init(&req->list);
2060                 ret = false;
2061         }
2062         spin_unlock(&list->lock);
2063         return ret;
2064 }
2065 
2066 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
2067 {
2068         int op = READ_ONCE(sqe->opcode);
2069 
2070         switch (op) {
2071         case IORING_OP_NOP:
2072         case IORING_OP_POLL_REMOVE:
2073                 return false;
2074         default:
2075                 return true;
2076         }
2077 }
2078 
2079 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
2080                            struct io_submit_state *state, struct io_kiocb *req)
2081 {
2082         unsigned flags;
2083         int fd;
2084 
2085         flags = READ_ONCE(s->sqe->flags);
2086         fd = READ_ONCE(s->sqe->fd);
2087 
2088         if (flags & IOSQE_IO_DRAIN) {
2089                 req->flags |= REQ_F_IO_DRAIN;
2090                 req->sequence = s->sequence;
2091         }
2092 
2093         if (!io_op_needs_file(s->sqe))
2094                 return 0;
2095 
2096         if (flags & IOSQE_FIXED_FILE) {
2097                 if (unlikely(!ctx->user_files ||
2098                     (unsigned) fd >= ctx->nr_user_files))
2099                         return -EBADF;
2100                 req->file = ctx->user_files[fd];
2101                 req->flags |= REQ_F_FIXED_FILE;
2102         } else {
2103                 if (s->needs_fixed_file)
2104                         return -EBADF;
2105                 req->file = io_file_get(state, fd);
2106                 if (unlikely(!req->file))
2107                         return -EBADF;
2108         }
2109 
2110         return 0;
2111 }
2112 
2113 static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2114                         struct sqe_submit *s)
2115 {
2116         int ret;
2117 
2118         ret = io_req_defer(ctx, req, s);
2119         if (ret) {
2120                 if (ret != -EIOCBQUEUED) {
2121                         io_free_req(req);
2122                         io_cqring_add_event(ctx, s->sqe->user_data, ret);
2123                 }
2124                 return 0;
2125         }
2126 
2127         ret = __io_submit_sqe(ctx, req, s, true);
2128 
2129         /*
2130          * We async punt it if the file wasn't marked NOWAIT, or if the file
2131          * doesn't support non-blocking read/write attempts
2132          */
2133         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
2134             (req->flags & REQ_F_MUST_PUNT))) {
2135                 struct io_uring_sqe *sqe_copy;
2136 
2137                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
2138                 if (sqe_copy) {
2139                         struct async_list *list;
2140 
2141                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
2142                         s->sqe = sqe_copy;
2143 
2144                         memcpy(&req->submit, s, sizeof(*s));
2145                         list = io_async_list_from_sqe(ctx, s->sqe);
2146                         if (!io_add_to_prev_work(list, req)) {
2147                                 if (list)
2148                                         atomic_inc(&list->cnt);
2149                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
2150                                 queue_work(ctx->sqo_wq, &req->work);
2151                         }
2152 
2153                         /*
2154                          * Queued up for async execution, worker will release
2155                          * submit reference when the iocb is actually submitted.
2156                          */
2157                         return 0;
2158                 }
2159         }
2160 
2161         /* drop submission reference */
2162         io_put_req(req);
2163 
2164         /* and drop final reference, if we failed */
2165         if (ret) {
2166                 io_cqring_add_event(ctx, req->user_data, ret);
2167                 if (req->flags & REQ_F_LINK)
2168                         req->flags |= REQ_F_FAIL_LINK;
2169                 io_put_req(req);
2170         }
2171 
2172         return ret;
2173 }
2174 
2175 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
2176 
2177 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
2178                           struct io_submit_state *state, struct io_kiocb **link)
2179 {
2180         struct io_uring_sqe *sqe_copy;
2181         struct io_kiocb *req;
2182         int ret;
2183 
2184         /* enforce forwards compatibility on users */
2185         if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
2186                 ret = -EINVAL;
2187                 goto err;
2188         }
2189 
2190         req = io_get_req(ctx, state);
2191         if (unlikely(!req)) {
2192                 ret = -EAGAIN;
2193                 goto err;
2194         }
2195 
2196         ret = io_req_set_file(ctx, s, state, req);
2197         if (unlikely(ret)) {
2198 err_req:
2199                 io_free_req(req);
2200 err:
2201                 io_cqring_add_event(ctx, s->sqe->user_data, ret);
2202                 return;
2203         }
2204 
2205         req->user_data = s->sqe->user_data;
2206 
2207         /*
2208          * If we already have a head request, queue this one for async
2209          * submittal once the head completes. If we don't have a head but
2210          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
2211          * submitted sync once the chain is complete. If none of those
2212          * conditions are true (normal request), then just queue it.
2213          */
2214         if (*link) {
2215                 struct io_kiocb *prev = *link;
2216 
2217                 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
2218                 if (!sqe_copy) {
2219                         ret = -EAGAIN;
2220                         goto err_req;
2221                 }
2222 
2223                 s->sqe = sqe_copy;
2224                 memcpy(&req->submit, s, sizeof(*s));
2225                 list_add_tail(&req->list, &prev->link_list);
2226         } else if (s->sqe->flags & IOSQE_IO_LINK) {
2227                 req->flags |= REQ_F_LINK;
2228 
2229                 memcpy(&req->submit, s, sizeof(*s));
2230                 INIT_LIST_HEAD(&req->link_list);
2231                 *link = req;
2232         } else {
2233                 io_queue_sqe(ctx, req, s);
2234         }
2235 }
2236 
2237 /*
2238  * Batched submission is done, ensure local IO is flushed out.
2239  */
2240 static void io_submit_state_end(struct io_submit_state *state)
2241 {
2242         blk_finish_plug(&state->plug);
2243         io_file_put(state);
2244         if (state->free_reqs)
2245                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
2246                                         &state->reqs[state->cur_req]);
2247 }
2248 
2249 /*
2250  * Start submission side cache.
2251  */
2252 static void io_submit_state_start(struct io_submit_state *state,
2253                                   struct io_ring_ctx *ctx, unsigned max_ios)
2254 {
2255         blk_start_plug(&state->plug);
2256         state->free_reqs = 0;
2257         state->file = NULL;
2258         state->ios_left = max_ios;
2259 }
2260 
2261 static void io_commit_sqring(struct io_ring_ctx *ctx)
2262 {
2263         struct io_sq_ring *ring = ctx->sq_ring;
2264 
2265         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
2266                 /*
2267                  * Ensure any loads from the SQEs are done at this point,
2268                  * since once we write the new head, the application could
2269                  * write new data to them.
2270                  */
2271                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
2272         }
2273 }
2274 
2275 /*
2276  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
2277  * that is mapped by userspace. This means that care needs to be taken to
2278  * ensure that reads are stable, as we cannot rely on userspace always
2279  * being a good citizen. If members of the sqe are validated and then later
2280  * used, it's important that those reads are done through READ_ONCE() to
2281  * prevent a re-load down the line.
2282  */
2283 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
2284 {
2285         struct io_sq_ring *ring = ctx->sq_ring;
2286         unsigned head;
2287 
2288         /*
2289          * The cached sq head (or cq tail) serves two purposes:
2290          *
2291          * 1) allows us to batch the cost of updating the user visible
2292          *    head updates.
2293          * 2) allows the kernel side to track the head on its own, even
2294          *    though the application is the one updating it.
2295          */
2296         head = ctx->cached_sq_head;
2297         /* make sure SQ entry isn't read before tail */
2298         if (head == smp_load_acquire(&ring->r.tail))
2299                 return false;
2300 
2301         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
2302         if (head < ctx->sq_entries) {
2303                 s->index = head;
2304                 s->sqe = &ctx->sq_sqes[head];
2305                 s->sequence = ctx->cached_sq_head;
2306                 ctx->cached_sq_head++;
2307                 return true;
2308         }
2309 
2310         /* drop invalid entries */
2311         ctx->cached_sq_head++;
2312         ctx->cached_sq_dropped++;
2313         WRITE_ONCE(ring->dropped, ctx->cached_sq_dropped);
2314         return false;
2315 }
2316 
2317 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
2318                           bool has_user, bool mm_fault)
2319 {
2320         struct io_submit_state state, *statep = NULL;
2321         struct io_kiocb *link = NULL;
2322         bool prev_was_link = false;
2323         int i, submitted = 0;
2324 
2325         if (nr > IO_PLUG_THRESHOLD) {
2326                 io_submit_state_start(&state, ctx, nr);
2327                 statep = &state;
2328         }
2329 
2330         for (i = 0; i < nr; i++) {
2331                 struct sqe_submit s;
2332 
2333                 if (!io_get_sqring(ctx, &s))
2334                         break;
2335 
2336                 /*
2337                  * If previous wasn't linked and we have a linked command,
2338                  * that's the end of the chain. Submit the previous link.
2339                  */
2340                 if (!prev_was_link && link) {
2341                         io_queue_sqe(ctx, link, &link->submit);
2342                         link = NULL;
2343                 }
2344                 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
2345 
2346                 if (unlikely(mm_fault)) {
2347                         io_cqring_add_event(ctx, s.sqe->user_data,
2348                                                 -EFAULT);
2349                 } else {
2350                         s.has_user = has_user;
2351                         s.needs_lock = true;
2352                         s.needs_fixed_file = true;
2353                         io_submit_sqe(ctx, &s, statep, &link);
2354                         submitted++;
2355                 }
2356         }
2357 
2358         if (link)
2359                 io_queue_sqe(ctx, link, &link->submit);
2360         if (statep)
2361                 io_submit_state_end(&state);
2362 
2363         return submitted;
2364 }
2365 
2366 static int io_sq_thread(void *data)
2367 {
2368         struct io_ring_ctx *ctx = data;
2369         struct mm_struct *cur_mm = NULL;
2370         const struct cred *old_cred;
2371         mm_segment_t old_fs;
2372         DEFINE_WAIT(wait);
2373         unsigned inflight;
2374         unsigned long timeout;
2375 
2376         complete(&ctx->sqo_thread_started);
2377 
2378         old_fs = get_fs();
2379         set_fs(USER_DS);
2380         old_cred = override_creds(ctx->creds);
2381 
2382         timeout = inflight = 0;
2383         while (!kthread_should_park()) {
2384                 bool mm_fault = false;
2385                 unsigned int to_submit;
2386 
2387                 if (inflight) {
2388                         unsigned nr_events = 0;
2389 
2390                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2391                                 /*
2392                                  * inflight is the count of the maximum possible
2393                                  * entries we submitted, but it can be smaller
2394                                  * if we dropped some of them. If we don't have
2395                                  * poll entries available, then we know that we
2396                                  * have nothing left to poll for. Reset the
2397                                  * inflight count to zero in that case.
2398                                  */
2399                                 mutex_lock(&ctx->uring_lock);
2400                                 if (!list_empty(&ctx->poll_list))
2401                                         __io_iopoll_check(ctx, &nr_events, 0);
2402                                 else
2403                                         inflight = 0;
2404                                 mutex_unlock(&ctx->uring_lock);
2405                         } else {
2406                                 /*
2407                                  * Normal IO, just pretend everything completed.
2408                                  * We don't have to poll completions for that.
2409                                  */
2410                                 nr_events = inflight;
2411                         }
2412 
2413                         inflight -= nr_events;
2414                         if (!inflight)
2415                                 timeout = jiffies + ctx->sq_thread_idle;
2416                 }
2417 
2418                 to_submit = io_sqring_entries(ctx);
2419                 if (!to_submit) {
2420                         /*
2421                          * We're polling. If we're within the defined idle
2422                          * period, then let us spin without work before going
2423                          * to sleep.
2424                          */
2425                         if (inflight || !time_after(jiffies, timeout)) {
2426                                 cpu_relax();
2427                                 continue;
2428                         }
2429 
2430                         /*
2431                          * Drop cur_mm before scheduling, we can't hold it for
2432                          * long periods (or over schedule()). Do this before
2433                          * adding ourselves to the waitqueue, as the unuse/drop
2434                          * may sleep.
2435                          */
2436                         if (cur_mm) {
2437                                 unuse_mm(cur_mm);
2438                                 mmput(cur_mm);
2439                                 cur_mm = NULL;
2440                         }
2441 
2442                         prepare_to_wait(&ctx->sqo_wait, &wait,
2443                                                 TASK_INTERRUPTIBLE);
2444 
2445                         /* Tell userspace we may need a wakeup call */
2446                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2447                         /* make sure to read SQ tail after writing flags */
2448                         smp_mb();
2449 
2450                         to_submit = io_sqring_entries(ctx);
2451                         if (!to_submit) {
2452                                 if (kthread_should_park()) {
2453                                         finish_wait(&ctx->sqo_wait, &wait);
2454                                         break;
2455                                 }
2456                                 if (signal_pending(current))
2457                                         flush_signals(current);
2458                                 schedule();
2459                                 finish_wait(&ctx->sqo_wait, &wait);
2460 
2461                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2462                                 continue;
2463                         }
2464                         finish_wait(&ctx->sqo_wait, &wait);
2465 
2466                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2467                 }
2468 
2469                 /* Unless all new commands are FIXED regions, grab mm */
2470                 if (!cur_mm) {
2471                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2472                         if (!mm_fault) {
2473                                 use_mm(ctx->sqo_mm);
2474                                 cur_mm = ctx->sqo_mm;
2475                         }
2476                 }
2477 
2478                 to_submit = min(to_submit, ctx->sq_entries);
2479                 inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL,
2480                                            mm_fault);
2481 
2482                 /* Commit SQ ring head once we've consumed all SQEs */
2483                 io_commit_sqring(ctx);
2484         }
2485 
2486         set_fs(old_fs);
2487         if (cur_mm) {
2488                 unuse_mm(cur_mm);
2489                 mmput(cur_mm);
2490         }
2491         revert_creds(old_cred);
2492 
2493         kthread_parkme();
2494 
2495         return 0;
2496 }
2497 
2498 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2499 {
2500         struct io_submit_state state, *statep = NULL;
2501         struct io_kiocb *link = NULL;
2502         bool prev_was_link = false;
2503         int i, submit = 0;
2504 
2505         if (to_submit > IO_PLUG_THRESHOLD) {
2506                 io_submit_state_start(&state, ctx, to_submit);
2507                 statep = &state;
2508         }
2509 
2510         for (i = 0; i < to_submit; i++) {
2511                 struct sqe_submit s;
2512 
2513                 if (!io_get_sqring(ctx, &s))
2514                         break;
2515 
2516                 /*
2517                  * If previous wasn't linked and we have a linked command,
2518                  * that's the end of the chain. Submit the previous link.
2519                  */
2520                 if (!prev_was_link && link) {
2521                         io_queue_sqe(ctx, link, &link->submit);
2522                         link = NULL;
2523                 }
2524                 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
2525 
2526                 s.has_user = true;
2527                 s.needs_lock = false;
2528                 s.needs_fixed_file = false;
2529                 submit++;
2530                 io_submit_sqe(ctx, &s, statep, &link);
2531         }
2532 
2533         if (link)
2534                 io_queue_sqe(ctx, link, &link->submit);
2535         if (statep)
2536                 io_submit_state_end(statep);
2537 
2538         io_commit_sqring(ctx);
2539 
2540         return submit;
2541 }
2542 
2543 /*
2544  * Wait until events become available, if we don't already have some. The
2545  * application must reap them itself, as they reside on the shared cq ring.
2546  */
2547 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2548                           const sigset_t __user *sig, size_t sigsz)
2549 {
2550         struct io_cq_ring *ring = ctx->cq_ring;
2551         int ret;
2552 
2553         if (io_cqring_events(ring) >= min_events)
2554                 return 0;
2555 
2556         if (sig) {
2557 #ifdef CONFIG_COMPAT
2558                 if (in_compat_syscall())
2559                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2560                                                       sigsz);
2561                 else
2562 #endif
2563                         ret = set_user_sigmask(sig, sigsz);
2564 
2565                 if (ret)
2566                         return ret;
2567         }
2568 
2569         ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
2570         restore_saved_sigmask_unless(ret == -ERESTARTSYS);
2571         if (ret == -ERESTARTSYS)
2572                 ret = -EINTR;
2573 
2574         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2575 }
2576 
2577 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2578 {
2579 #if defined(CONFIG_UNIX)
2580         if (ctx->ring_sock) {
2581                 struct sock *sock = ctx->ring_sock->sk;
2582                 struct sk_buff *skb;
2583 
2584                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2585                         kfree_skb(skb);
2586         }
2587 #else
2588         int i;
2589 
2590         for (i = 0; i < ctx->nr_user_files; i++)
2591                 fput(ctx->user_files[i]);
2592 #endif
2593 }
2594 
2595 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2596 {
2597         if (!ctx->user_files)
2598                 return -ENXIO;
2599 
2600         __io_sqe_files_unregister(ctx);
2601         kfree(ctx->user_files);
2602         ctx->user_files = NULL;
2603         ctx->nr_user_files = 0;
2604         return 0;
2605 }
2606 
2607 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2608 {
2609         if (ctx->sqo_thread) {
2610                 wait_for_completion(&ctx->sqo_thread_started);
2611                 /*
2612                  * The park is a bit of a work-around, without it we get
2613                  * warning spews on shutdown with SQPOLL set and affinity
2614                  * set to a single CPU.
2615                  */
2616                 kthread_park(ctx->sqo_thread);
2617                 kthread_stop(ctx->sqo_thread);
2618                 ctx->sqo_thread = NULL;
2619         }
2620 }
2621 
2622 static void io_finish_async(struct io_ring_ctx *ctx)
2623 {
2624         io_sq_thread_stop(ctx);
2625 
2626         if (ctx->sqo_wq) {
2627                 destroy_workqueue(ctx->sqo_wq);
2628                 ctx->sqo_wq = NULL;
2629         }
2630 }
2631 
2632 #if defined(CONFIG_UNIX)
2633 static void io_destruct_skb(struct sk_buff *skb)
2634 {
2635         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2636 
2637         if (ctx->sqo_wq)
2638                 flush_workqueue(ctx->sqo_wq);
2639         unix_destruct_scm(skb);
2640 }
2641 
2642 /*
2643  * Ensure the UNIX gc is aware of our file set, so we are certain that
2644  * the io_uring can be safely unregistered on process exit, even if we have
2645  * loops in the file referencing.
2646  */
2647 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2648 {
2649         struct sock *sk = ctx->ring_sock->sk;
2650         struct scm_fp_list *fpl;
2651         struct sk_buff *skb;
2652         int i;
2653 
2654         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2655                 unsigned long inflight = ctx->user->unix_inflight + nr;
2656 
2657                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2658                         return -EMFILE;
2659         }
2660 
2661         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2662         if (!fpl)
2663                 return -ENOMEM;
2664 
2665         skb = alloc_skb(0, GFP_KERNEL);
2666         if (!skb) {
2667                 kfree(fpl);
2668                 return -ENOMEM;
2669         }
2670 
2671         skb->sk = sk;
2672         skb->destructor = io_destruct_skb;
2673 
2674         fpl->user = get_uid(ctx->user);
2675         for (i = 0; i < nr; i++) {
2676                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2677                 unix_inflight(fpl->user, fpl->fp[i]);
2678         }
2679 
2680         fpl->max = fpl->count = nr;
2681         UNIXCB(skb).fp = fpl;
2682         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2683         skb_queue_head(&sk->sk_receive_queue, skb);
2684 
2685         for (i = 0; i < nr; i++)
2686                 fput(fpl->fp[i]);
2687 
2688         return 0;
2689 }
2690 
2691 /*
2692  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2693  * causes regular reference counting to break down. We rely on the UNIX
2694  * garbage collection to take care of this problem for us.
2695  */
2696 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2697 {
2698         unsigned left, total;
2699         int ret = 0;
2700 
2701         total = 0;
2702         left = ctx->nr_user_files;
2703         while (left) {
2704                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2705 
2706                 ret = __io_sqe_files_scm(ctx, this_files, total);
2707                 if (ret)
2708                         break;
2709                 left -= this_files;
2710                 total += this_files;
2711         }
2712 
2713         if (!ret)
2714                 return 0;
2715 
2716         while (total < ctx->nr_user_files) {
2717                 fput(ctx->user_files[total]);
2718                 total++;
2719         }
2720 
2721         return ret;
2722 }
2723 #else
2724 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2725 {
2726         return 0;
2727 }
2728 #endif
2729 
2730 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2731                                  unsigned nr_args)
2732 {
2733         __s32 __user *fds = (__s32 __user *) arg;
2734         int fd, ret = 0;
2735         unsigned i;
2736 
2737         if (ctx->user_files)
2738                 return -EBUSY;
2739         if (!nr_args)
2740                 return -EINVAL;
2741         if (nr_args > IORING_MAX_FIXED_FILES)
2742                 return -EMFILE;
2743 
2744         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2745         if (!ctx->user_files)
2746                 return -ENOMEM;
2747 
2748         for (i = 0; i < nr_args; i++) {
2749                 ret = -EFAULT;
2750                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2751                         break;
2752 
2753                 ctx->user_files[i] = fget(fd);
2754 
2755                 ret = -EBADF;
2756                 if (!ctx->user_files[i])
2757                         break;
2758                 /*
2759                  * Don't allow io_uring instances to be registered. If UNIX
2760                  * isn't enabled, then this causes a reference cycle and this
2761                  * instance can never get freed. If UNIX is enabled we'll
2762                  * handle it just fine, but there's still no point in allowing
2763                  * a ring fd as it doesn't support regular read/write anyway.
2764                  */
2765                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2766                         fput(ctx->user_files[i]);
2767                         break;
2768                 }
2769                 ctx->nr_user_files++;
2770                 ret = 0;
2771         }
2772 
2773         if (ret) {
2774                 for (i = 0; i < ctx->nr_user_files; i++)
2775                         fput(ctx->user_files[i]);
2776 
2777                 kfree(ctx->user_files);
2778                 ctx->user_files = NULL;
2779                 ctx->nr_user_files = 0;
2780                 return ret;
2781         }
2782 
2783         ret = io_sqe_files_scm(ctx);
2784         if (ret)
2785                 io_sqe_files_unregister(ctx);
2786 
2787         return ret;
2788 }
2789 
2790 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2791                                struct io_uring_params *p)
2792 {
2793         int ret;
2794 
2795         init_waitqueue_head(&ctx->sqo_wait);
2796         mmgrab(current->mm);
2797         ctx->sqo_mm = current->mm;
2798 
2799         if (ctx->flags & IORING_SETUP_SQPOLL) {
2800                 ret = -EPERM;
2801                 if (!capable(CAP_SYS_ADMIN))
2802                         goto err;
2803 
2804                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2805                 if (!ctx->sq_thread_idle)
2806                         ctx->sq_thread_idle = HZ;
2807 
2808                 if (p->flags & IORING_SETUP_SQ_AFF) {
2809                         int cpu = p->sq_thread_cpu;
2810 
2811                         ret = -EINVAL;
2812                         if (cpu >= nr_cpu_ids)
2813                                 goto err;
2814                         if (!cpu_online(cpu))
2815                                 goto err;
2816 
2817                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2818                                                         ctx, cpu,
2819                                                         "io_uring-sq");
2820                 } else {
2821                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2822                                                         "io_uring-sq");
2823                 }
2824                 if (IS_ERR(ctx->sqo_thread)) {
2825                         ret = PTR_ERR(ctx->sqo_thread);
2826                         ctx->sqo_thread = NULL;
2827                         goto err;
2828                 }
2829                 wake_up_process(ctx->sqo_thread);
2830         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2831                 /* Can't have SQ_AFF without SQPOLL */
2832                 ret = -EINVAL;
2833                 goto err;
2834         }
2835 
2836         /* Do QD, or 2 * CPUS, whatever is smallest */
2837         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2838                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2839         if (!ctx->sqo_wq) {
2840                 ret = -ENOMEM;
2841                 goto err;
2842         }
2843 
2844         return 0;
2845 err:
2846         io_sq_thread_stop(ctx);
2847         mmdrop(ctx->sqo_mm);
2848         ctx->sqo_mm = NULL;
2849         return ret;
2850 }
2851 
2852 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2853 {
2854         atomic_long_sub(nr_pages, &user->locked_vm);
2855 }
2856 
2857 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2858 {
2859         unsigned long page_limit, cur_pages, new_pages;
2860 
2861         /* Don't allow more pages than we can safely lock */
2862         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2863 
2864         do {
2865                 cur_pages = atomic_long_read(&user->locked_vm);
2866                 new_pages = cur_pages + nr_pages;
2867                 if (new_pages > page_limit)
2868                         return -ENOMEM;
2869         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2870                                         new_pages) != cur_pages);
2871 
2872         return 0;
2873 }
2874 
2875 static void io_mem_free(void *ptr)
2876 {
2877         struct page *page;
2878 
2879         if (!ptr)
2880                 return;
2881 
2882         page = virt_to_head_page(ptr);
2883         if (put_page_testzero(page))
2884                 free_compound_page(page);
2885 }
2886 
2887 static void *io_mem_alloc(size_t size)
2888 {
2889         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2890                                 __GFP_NORETRY;
2891 
2892         return (void *) __get_free_pages(gfp_flags, get_order(size));
2893 }
2894 
2895 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2896 {
2897         struct io_sq_ring *sq_ring;
2898         struct io_cq_ring *cq_ring;
2899         size_t bytes;
2900 
2901         bytes = struct_size(sq_ring, array, sq_entries);
2902         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2903         bytes += struct_size(cq_ring, cqes, cq_entries);
2904 
2905         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2906 }
2907 
2908 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2909 {
2910         int i, j;
2911 
2912         if (!ctx->user_bufs)
2913                 return -ENXIO;
2914 
2915         for (i = 0; i < ctx->nr_user_bufs; i++) {
2916                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2917 
2918                 for (j = 0; j < imu->nr_bvecs; j++)
2919                         put_page(imu->bvec[j].bv_page);
2920 
2921                 if (ctx->account_mem)
2922                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2923                 kvfree(imu->bvec);
2924                 imu->nr_bvecs = 0;
2925         }
2926 
2927         kfree(ctx->user_bufs);
2928         ctx->user_bufs = NULL;
2929         ctx->nr_user_bufs = 0;
2930         return 0;
2931 }
2932 
2933 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2934                        void __user *arg, unsigned index)
2935 {
2936         struct iovec __user *src;
2937 
2938 #ifdef CONFIG_COMPAT
2939         if (ctx->compat) {
2940                 struct compat_iovec __user *ciovs;
2941                 struct compat_iovec ciov;
2942 
2943                 ciovs = (struct compat_iovec __user *) arg;
2944                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2945                         return -EFAULT;
2946 
2947                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2948                 dst->iov_len = ciov.iov_len;
2949                 return 0;
2950         }
2951 #endif
2952         src = (struct iovec __user *) arg;
2953         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2954                 return -EFAULT;
2955         return 0;
2956 }
2957 
2958 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2959                                   unsigned nr_args)
2960 {
2961         struct vm_area_struct **vmas = NULL;
2962         struct page **pages = NULL;
2963         int i, j, got_pages = 0;
2964         int ret = -EINVAL;
2965 
2966         if (ctx->user_bufs)
2967                 return -EBUSY;
2968         if (!nr_args || nr_args > UIO_MAXIOV)
2969                 return -EINVAL;
2970 
2971         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2972                                         GFP_KERNEL);
2973         if (!ctx->user_bufs)
2974                 return -ENOMEM;
2975 
2976         for (i = 0; i < nr_args; i++) {
2977                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2978                 unsigned long off, start, end, ubuf;
2979                 int pret, nr_pages;
2980                 struct iovec iov;
2981                 size_t size;
2982 
2983                 ret = io_copy_iov(ctx, &iov, arg, i);
2984                 if (ret)
2985                         goto err;
2986 
2987                 /*
2988                  * Don't impose further limits on the size and buffer
2989                  * constraints here, we'll -EINVAL later when IO is
2990                  * submitted if they are wrong.
2991                  */
2992                 ret = -EFAULT;
2993                 if (!iov.iov_base || !iov.iov_len)
2994                         goto err;
2995 
2996                 /* arbitrary limit, but we need something */
2997                 if (iov.iov_len > SZ_1G)
2998                         goto err;
2999 
3000                 ubuf = (unsigned long) iov.iov_base;
3001                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
3002                 start = ubuf >> PAGE_SHIFT;
3003                 nr_pages = end - start;
3004 
3005                 if (ctx->account_mem) {
3006                         ret = io_account_mem(ctx->user, nr_pages);
3007                         if (ret)
3008                                 goto err;
3009                 }
3010 
3011                 ret = 0;
3012                 if (!pages || nr_pages > got_pages) {
3013                         kfree(vmas);
3014                         kfree(pages);
3015                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
3016                                                 GFP_KERNEL);
3017                         vmas = kvmalloc_array(nr_pages,
3018                                         sizeof(struct vm_area_struct *),
3019                                         GFP_KERNEL);
3020                         if (!pages || !vmas) {
3021                                 ret = -ENOMEM;
3022                                 if (ctx->account_mem)
3023                                         io_unaccount_mem(ctx->user, nr_pages);
3024                                 goto err;
3025                         }
3026                         got_pages = nr_pages;
3027                 }
3028 
3029                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
3030                                                 GFP_KERNEL);
3031                 ret = -ENOMEM;
3032                 if (!imu->bvec) {
3033                         if (ctx->account_mem)
3034                                 io_unaccount_mem(ctx->user, nr_pages);
3035                         goto err;
3036                 }
3037 
3038                 ret = 0;
3039                 down_read(&current->mm->mmap_sem);
3040                 pret = get_user_pages(ubuf, nr_pages,
3041                                       FOLL_WRITE | FOLL_LONGTERM,
3042                                       pages, vmas);
3043                 if (pret == nr_pages) {
3044                         /* don't support file backed memory */
3045                         for (j = 0; j < nr_pages; j++) {
3046                                 struct vm_area_struct *vma = vmas[j];
3047 
3048                                 if (vma->vm_file &&
3049                                     !is_file_hugepages(vma->vm_file)) {
3050                                         ret = -EOPNOTSUPP;
3051                                         break;
3052                                 }
3053                         }
3054                 } else {
3055                         ret = pret < 0 ? pret : -EFAULT;
3056                 }
3057                 up_read(&current->mm->mmap_sem);
3058                 if (ret) {
3059                         /*
3060                          * if we did partial map, or found file backed vmas,
3061                          * release any pages we did get
3062                          */
3063                         if (pret > 0) {
3064                                 for (j = 0; j < pret; j++)
3065                                         put_page(pages[j]);
3066                         }
3067                         if (ctx->account_mem)
3068                                 io_unaccount_mem(ctx->user, nr_pages);
3069                         kvfree(imu->bvec);
3070                         goto err;
3071                 }
3072 
3073                 off = ubuf & ~PAGE_MASK;
3074                 size = iov.iov_len;
3075                 for (j = 0; j < nr_pages; j++) {
3076                         size_t vec_len;
3077 
3078                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
3079                         imu->bvec[j].bv_page = pages[j];
3080                         imu->bvec[j].bv_len = vec_len;
3081                         imu->bvec[j].bv_offset = off;
3082                         off = 0;
3083                         size -= vec_len;
3084                 }
3085                 /* store original address for later verification */
3086                 imu->ubuf = ubuf;
3087                 imu->len = iov.iov_len;
3088                 imu->nr_bvecs = nr_pages;
3089 
3090                 ctx->nr_user_bufs++;
3091         }
3092         kvfree(pages);
3093         kvfree(vmas);
3094         return 0;
3095 err:
3096         kvfree(pages);
3097         kvfree(vmas);
3098         io_sqe_buffer_unregister(ctx);
3099         return ret;
3100 }
3101 
3102 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
3103 {
3104         __s32 __user *fds = arg;
3105         int fd;
3106 
3107         if (ctx->cq_ev_fd)
3108                 return -EBUSY;
3109 
3110         if (copy_from_user(&fd, fds, sizeof(*fds)))
3111                 return -EFAULT;
3112 
3113         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
3114         if (IS_ERR(ctx->cq_ev_fd)) {
3115                 int ret = PTR_ERR(ctx->cq_ev_fd);
3116                 ctx->cq_ev_fd = NULL;
3117                 return ret;
3118         }
3119 
3120         return 0;
3121 }
3122 
3123 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
3124 {
3125         if (ctx->cq_ev_fd) {
3126                 eventfd_ctx_put(ctx->cq_ev_fd);
3127                 ctx->cq_ev_fd = NULL;
3128                 return 0;
3129         }
3130 
3131         return -ENXIO;
3132 }
3133 
3134 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
3135 {
3136         io_finish_async(ctx);
3137         if (ctx->sqo_mm)
3138                 mmdrop(ctx->sqo_mm);
3139 
3140         io_iopoll_reap_events(ctx);
3141         io_sqe_buffer_unregister(ctx);
3142         io_sqe_files_unregister(ctx);
3143         io_eventfd_unregister(ctx);
3144 
3145 #if defined(CONFIG_UNIX)
3146         if (ctx->ring_sock) {
3147                 ctx->ring_sock->file = NULL; /* so that iput() is called */
3148                 sock_release(ctx->ring_sock);
3149         }
3150 #endif
3151 
3152         io_mem_free(ctx->sq_ring);
3153         io_mem_free(ctx->sq_sqes);
3154         io_mem_free(ctx->cq_ring);
3155 
3156         percpu_ref_exit(&ctx->refs);
3157         if (ctx->account_mem)
3158                 io_unaccount_mem(ctx->user,
3159                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
3160         free_uid(ctx->user);
3161         if (ctx->creds)
3162                 put_cred(ctx->creds);
3163         kfree(ctx);
3164 }
3165 
3166 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
3167 {
3168         struct io_ring_ctx *ctx = file->private_data;
3169         __poll_t mask = 0;
3170 
3171         poll_wait(file, &ctx->cq_wait, wait);
3172         /*
3173          * synchronizes with barrier from wq_has_sleeper call in
3174          * io_commit_cqring
3175          */
3176         smp_rmb();
3177         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
3178             ctx->sq_ring->ring_entries)
3179                 mask |= EPOLLOUT | EPOLLWRNORM;
3180         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
3181                 mask |= EPOLLIN | EPOLLRDNORM;
3182 
3183         return mask;
3184 }
3185 
3186 static int io_uring_fasync(int fd, struct file *file, int on)
3187 {
3188         struct io_ring_ctx *ctx = file->private_data;
3189 
3190         return fasync_helper(fd, file, on, &ctx->cq_fasync);
3191 }
3192 
3193 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3194 {
3195         mutex_lock(&ctx->uring_lock);
3196         percpu_ref_kill(&ctx->refs);
3197         mutex_unlock(&ctx->uring_lock);
3198 
3199         io_poll_remove_all(ctx);
3200         io_iopoll_reap_events(ctx);
3201         wait_for_completion(&ctx->ctx_done);
3202         io_ring_ctx_free(ctx);
3203 }
3204 
3205 static int io_uring_release(struct inode *inode, struct file *file)
3206 {
3207         struct io_ring_ctx *ctx = file->private_data;
3208 
3209         file->private_data = NULL;
3210         io_ring_ctx_wait_and_kill(ctx);
3211         return 0;
3212 }
3213 
3214 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3215 {
3216         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
3217         unsigned long sz = vma->vm_end - vma->vm_start;
3218         struct io_ring_ctx *ctx = file->private_data;
3219         unsigned long pfn;
3220         struct page *page;
3221         void *ptr;
3222 
3223         switch (offset) {
3224         case IORING_OFF_SQ_RING:
3225                 ptr = ctx->sq_ring;
3226                 break;
3227         case IORING_OFF_SQES:
3228                 ptr = ctx->sq_sqes;
3229                 break;
3230         case IORING_OFF_CQ_RING:
3231                 ptr = ctx->cq_ring;
3232                 break;
3233         default:
3234                 return -EINVAL;
3235         }
3236 
3237         page = virt_to_head_page(ptr);
3238         if (sz > (PAGE_SIZE << compound_order(page)))
3239                 return -EINVAL;
3240 
3241         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
3242         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
3243 }
3244 
3245 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3246                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
3247                 size_t, sigsz)
3248 {
3249         struct io_ring_ctx *ctx;
3250         long ret = -EBADF;
3251         int submitted = 0;
3252         struct fd f;
3253 
3254         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
3255                 return -EINVAL;
3256 
3257         f = fdget(fd);
3258         if (!f.file)
3259                 return -EBADF;
3260 
3261         ret = -EOPNOTSUPP;
3262         if (f.file->f_op != &io_uring_fops)
3263                 goto out_fput;
3264 
3265         ret = -ENXIO;
3266         ctx = f.file->private_data;
3267         if (!percpu_ref_tryget(&ctx->refs))
3268                 goto out_fput;
3269 
3270         /*
3271          * For SQ polling, the thread will do all submissions and completions.
3272          * Just return the requested submit count, and wake the thread if
3273          * we were asked to.
3274          */
3275         if (ctx->flags & IORING_SETUP_SQPOLL) {
3276                 if (flags & IORING_ENTER_SQ_WAKEUP)
3277                         wake_up(&ctx->sqo_wait);
3278                 submitted = to_submit;
3279                 goto out_ctx;
3280         }
3281 
3282         ret = 0;
3283         if (to_submit) {
3284                 to_submit = min(to_submit, ctx->sq_entries);
3285 
3286                 mutex_lock(&ctx->uring_lock);
3287                 submitted = io_ring_submit(ctx, to_submit);
3288                 mutex_unlock(&ctx->uring_lock);
3289         }
3290         if (flags & IORING_ENTER_GETEVENTS) {
3291                 unsigned nr_events = 0;
3292 
3293                 min_complete = min(min_complete, ctx->cq_entries);
3294 
3295                 if (ctx->flags & IORING_SETUP_IOPOLL) {
3296                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
3297                 } else {
3298                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
3299                 }
3300         }
3301 
3302 out_ctx:
3303         io_ring_drop_ctx_refs(ctx, 1);
3304 out_fput:
3305         fdput(f);
3306         return submitted ? submitted : ret;
3307 }
3308 
3309 static const struct file_operations io_uring_fops = {
3310         .release        = io_uring_release,
3311         .mmap           = io_uring_mmap,
3312         .poll           = io_uring_poll,
3313         .fasync         = io_uring_fasync,
3314 };
3315 
3316 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3317                                   struct io_uring_params *p)
3318 {
3319         struct io_sq_ring *sq_ring;
3320         struct io_cq_ring *cq_ring;
3321         size_t size;
3322 
3323         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
3324         if (!sq_ring)
3325                 return -ENOMEM;
3326 
3327         ctx->sq_ring = sq_ring;
3328         sq_ring->ring_mask = p->sq_entries - 1;
3329         sq_ring->ring_entries = p->sq_entries;
3330         ctx->sq_mask = sq_ring->ring_mask;
3331         ctx->sq_entries = sq_ring->ring_entries;
3332 
3333         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3334         if (size == SIZE_MAX)
3335                 return -EOVERFLOW;
3336 
3337         ctx->sq_sqes = io_mem_alloc(size);
3338         if (!ctx->sq_sqes)
3339                 return -ENOMEM;
3340 
3341         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
3342         if (!cq_ring)
3343                 return -ENOMEM;
3344 
3345         ctx->cq_ring = cq_ring;
3346         cq_ring->ring_mask = p->cq_entries - 1;
3347         cq_ring->ring_entries = p->cq_entries;
3348         ctx->cq_mask = cq_ring->ring_mask;
3349         ctx->cq_entries = cq_ring->ring_entries;
3350         return 0;
3351 }
3352 
3353 /*
3354  * Allocate an anonymous fd, this is what constitutes the application
3355  * visible backing of an io_uring instance. The application mmaps this
3356  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3357  * we have to tie this fd to a socket for file garbage collection purposes.
3358  */
3359 static int io_uring_get_fd(struct io_ring_ctx *ctx)
3360 {
3361         struct file *file;
3362         int ret;
3363 
3364 #if defined(CONFIG_UNIX)
3365         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3366                                 &ctx->ring_sock);
3367         if (ret)
3368                 return ret;
3369 #endif
3370 
3371         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3372         if (ret < 0)
3373                 goto err;
3374 
3375         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3376                                         O_RDWR | O_CLOEXEC);
3377         if (IS_ERR(file)) {
3378                 put_unused_fd(ret);
3379                 ret = PTR_ERR(file);
3380                 goto err;
3381         }
3382 
3383 #if defined(CONFIG_UNIX)
3384         ctx->ring_sock->file = file;
3385         ctx->ring_sock->sk->sk_user_data = ctx;
3386 #endif
3387         fd_install(ret, file);
3388         return ret;
3389 err:
3390 #if defined(CONFIG_UNIX)
3391         sock_release(ctx->ring_sock);
3392         ctx->ring_sock = NULL;
3393 #endif
3394         return ret;
3395 }
3396 
3397 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3398 {
3399         struct user_struct *user = NULL;
3400         struct io_ring_ctx *ctx;
3401         bool account_mem;
3402         int ret;
3403 
3404         if (!entries || entries > IORING_MAX_ENTRIES)
3405                 return -EINVAL;
3406 
3407         /*
3408          * Use twice as many entries for the CQ ring. It's possible for the
3409          * application to drive a higher depth than the size of the SQ ring,
3410          * since the sqes are only used at submission time. This allows for
3411          * some flexibility in overcommitting a bit.
3412          */
3413         p->sq_entries = roundup_pow_of_two(entries);
3414         p->cq_entries = 2 * p->sq_entries;
3415 
3416         user = get_uid(current_user());
3417         account_mem = !capable(CAP_IPC_LOCK);
3418 
3419         if (account_mem) {
3420                 ret = io_account_mem(user,
3421                                 ring_pages(p->sq_entries, p->cq_entries));
3422                 if (ret) {
3423                         free_uid(user);
3424                         return ret;
3425                 }
3426         }
3427 
3428         ctx = io_ring_ctx_alloc(p);
3429         if (!ctx) {
3430                 if (account_mem)
3431                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3432                                                                 p->cq_entries));
3433                 free_uid(user);
3434                 return -ENOMEM;
3435         }
3436         ctx->compat = in_compat_syscall();
3437         ctx->account_mem = account_mem;
3438         ctx->user = user;
3439 
3440         ctx->creds = prepare_creds();
3441         if (!ctx->creds) {
3442                 ret = -ENOMEM;
3443                 goto err;
3444         }
3445 
3446         ret = io_allocate_scq_urings(ctx, p);
3447         if (ret)
3448                 goto err;
3449 
3450         ret = io_sq_offload_start(ctx, p);
3451         if (ret)
3452                 goto err;
3453 
3454         ret = io_uring_get_fd(ctx);
3455         if (ret < 0)
3456                 goto err;
3457 
3458         memset(&p->sq_off, 0, sizeof(p->sq_off));
3459         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3460         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3461         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3462         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3463         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3464         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3465         p->sq_off.array = offsetof(struct io_sq_ring, array);
3466 
3467         memset(&p->cq_off, 0, sizeof(p->cq_off));
3468         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3469         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3470         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3471         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3472         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3473         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3474         return ret;
3475 err:
3476         io_ring_ctx_wait_and_kill(ctx);
3477         return ret;
3478 }
3479 
3480 /*
3481  * Sets up an aio uring context, and returns the fd. Applications asks for a
3482  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3483  * params structure passed in.
3484  */
3485 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3486 {
3487         struct io_uring_params p;
3488         long ret;
3489         int i;
3490 
3491         if (copy_from_user(&p, params, sizeof(p)))
3492                 return -EFAULT;
3493         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3494                 if (p.resv[i])
3495                         return -EINVAL;
3496         }
3497 
3498         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3499                         IORING_SETUP_SQ_AFF))
3500                 return -EINVAL;
3501 
3502         ret = io_uring_create(entries, &p);
3503         if (ret < 0)
3504                 return ret;
3505 
3506         if (copy_to_user(params, &p, sizeof(p)))
3507                 return -EFAULT;
3508 
3509         return ret;
3510 }
3511 
3512 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3513                 struct io_uring_params __user *, params)
3514 {
3515         return io_uring_setup(entries, params);
3516 }
3517 
3518 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3519                                void __user *arg, unsigned nr_args)
3520         __releases(ctx->uring_lock)
3521         __acquires(ctx->uring_lock)
3522 {
3523         int ret;
3524 
3525         /*
3526          * We're inside the ring mutex, if the ref is already dying, then
3527          * someone else killed the ctx or is already going through
3528          * io_uring_register().
3529          */
3530         if (percpu_ref_is_dying(&ctx->refs))
3531                 return -ENXIO;
3532 
3533         percpu_ref_kill(&ctx->refs);
3534 
3535         /*
3536          * Drop uring mutex before waiting for references to exit. If another
3537          * thread is currently inside io_uring_enter() it might need to grab
3538          * the uring_lock to make progress. If we hold it here across the drain
3539          * wait, then we can deadlock. It's safe to drop the mutex here, since
3540          * no new references will come in after we've killed the percpu ref.
3541          */
3542         mutex_unlock(&ctx->uring_lock);
3543         wait_for_completion(&ctx->ctx_done);
3544         mutex_lock(&ctx->uring_lock);
3545 
3546         switch (opcode) {
3547         case IORING_REGISTER_BUFFERS:
3548                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3549                 break;
3550         case IORING_UNREGISTER_BUFFERS:
3551                 ret = -EINVAL;
3552                 if (arg || nr_args)
3553                         break;
3554                 ret = io_sqe_buffer_unregister(ctx);
3555                 break;
3556         case IORING_REGISTER_FILES:
3557                 ret = io_sqe_files_register(ctx, arg, nr_args);
3558                 break;
3559         case IORING_UNREGISTER_FILES:
3560                 ret = -EINVAL;
3561                 if (arg || nr_args)
3562                         break;
3563                 ret = io_sqe_files_unregister(ctx);
3564                 break;
3565         case IORING_REGISTER_EVENTFD:
3566                 ret = -EINVAL;
3567                 if (nr_args != 1)
3568                         break;
3569                 ret = io_eventfd_register(ctx, arg);
3570                 break;
3571         case IORING_UNREGISTER_EVENTFD:
3572                 ret = -EINVAL;
3573                 if (arg || nr_args)
3574                         break;
3575                 ret = io_eventfd_unregister(ctx);
3576                 break;
3577         default:
3578                 ret = -EINVAL;
3579                 break;
3580         }
3581 
3582         /* bring the ctx back to life */
3583         reinit_completion(&ctx->ctx_done);
3584         percpu_ref_reinit(&ctx->refs);
3585         return ret;
3586 }
3587 
3588 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3589                 void __user *, arg, unsigned int, nr_args)
3590 {
3591         struct io_ring_ctx *ctx;
3592         long ret = -EBADF;
3593         struct fd f;
3594 
3595         f = fdget(fd);
3596         if (!f.file)
3597                 return -EBADF;
3598 
3599         ret = -EOPNOTSUPP;
3600         if (f.file->f_op != &io_uring_fops)
3601                 goto out_fput;
3602 
3603         ctx = f.file->private_data;
3604 
3605         mutex_lock(&ctx->uring_lock);
3606         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3607         mutex_unlock(&ctx->uring_lock);
3608 out_fput:
3609         fdput(f);
3610         return ret;
3611 }
3612 
3613 static int __init io_uring_init(void)
3614 {
3615         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3616         return 0;
3617 };
3618 __initcall(io_uring_init);
3619 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp