~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c

Version: ~ [ linux-5.15-rc5 ] ~ [ linux-5.14.11 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.72 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.152 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.210 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.250 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.286 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.288 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.18.140 ] ~ [ linux-3.16.85 ] ~ [ linux-3.14.79 ] ~ [ linux-3.12.74 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 /*
  2  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  3  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  4  *
  5  * This software is available to you under a choice of one of two
  6  * licenses.  You may choose to be licensed under the terms of the GNU
  7  * General Public License (GPL) Version 2, available from the file
  8  * COPYING in the main directory of this source tree, or the BSD-type
  9  * license below:
 10  *
 11  * Redistribution and use in source and binary forms, with or without
 12  * modification, are permitted provided that the following conditions
 13  * are met:
 14  *
 15  *      Redistributions of source code must retain the above copyright
 16  *      notice, this list of conditions and the following disclaimer.
 17  *
 18  *      Redistributions in binary form must reproduce the above
 19  *      copyright notice, this list of conditions and the following
 20  *      disclaimer in the documentation and/or other materials provided
 21  *      with the distribution.
 22  *
 23  *      Neither the name of the Network Appliance, Inc. nor the names of
 24  *      its contributors may be used to endorse or promote products
 25  *      derived from this software without specific prior written
 26  *      permission.
 27  *
 28  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 29  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 30  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 31  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 32  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 33  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 34  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 35  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 36  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 37  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 38  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 39  *
 40  * Author: Tom Tucker <tom@opengridcomputing.com>
 41  */
 42 
 43 #include <linux/sunrpc/debug.h>
 44 #include <linux/sunrpc/rpc_rdma.h>
 45 #include <linux/spinlock.h>
 46 #include <asm/unaligned.h>
 47 #include <rdma/ib_verbs.h>
 48 #include <rdma/rdma_cm.h>
 49 #include <linux/sunrpc/svc_rdma.h>
 50 
 51 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
 52 
 53 /*
 54  * Replace the pages in the rq_argpages array with the pages from the SGE in
 55  * the RDMA_RECV completion. The SGL should contain full pages up until the
 56  * last one.
 57  */
 58 static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
 59                                struct svc_rdma_op_ctxt *ctxt,
 60                                u32 byte_count)
 61 {
 62         struct rpcrdma_msg *rmsgp;
 63         struct page *page;
 64         u32 bc;
 65         int sge_no;
 66 
 67         /* Swap the page in the SGE with the page in argpages */
 68         page = ctxt->pages[0];
 69         put_page(rqstp->rq_pages[0]);
 70         rqstp->rq_pages[0] = page;
 71 
 72         /* Set up the XDR head */
 73         rqstp->rq_arg.head[0].iov_base = page_address(page);
 74         rqstp->rq_arg.head[0].iov_len =
 75                 min_t(size_t, byte_count, ctxt->sge[0].length);
 76         rqstp->rq_arg.len = byte_count;
 77         rqstp->rq_arg.buflen = byte_count;
 78 
 79         /* Compute bytes past head in the SGL */
 80         bc = byte_count - rqstp->rq_arg.head[0].iov_len;
 81 
 82         /* If data remains, store it in the pagelist */
 83         rqstp->rq_arg.page_len = bc;
 84         rqstp->rq_arg.page_base = 0;
 85 
 86         /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
 87         rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
 88         if (rmsgp->rm_type == rdma_nomsg)
 89                 rqstp->rq_arg.pages = &rqstp->rq_pages[0];
 90         else
 91                 rqstp->rq_arg.pages = &rqstp->rq_pages[1];
 92 
 93         sge_no = 1;
 94         while (bc && sge_no < ctxt->count) {
 95                 page = ctxt->pages[sge_no];
 96                 put_page(rqstp->rq_pages[sge_no]);
 97                 rqstp->rq_pages[sge_no] = page;
 98                 bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
 99                 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
100                 sge_no++;
101         }
102         rqstp->rq_respages = &rqstp->rq_pages[sge_no];
103         rqstp->rq_next_page = rqstp->rq_respages + 1;
104 
105         /* If not all pages were used from the SGL, free the remaining ones */
106         bc = sge_no;
107         while (sge_no < ctxt->count) {
108                 page = ctxt->pages[sge_no++];
109                 put_page(page);
110         }
111         ctxt->count = bc;
112 
113         /* Set up tail */
114         rqstp->rq_arg.tail[0].iov_base = NULL;
115         rqstp->rq_arg.tail[0].iov_len = 0;
116 }
117 
118 static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
119 {
120         if (!rdma_cap_read_multi_sge(xprt->sc_cm_id->device,
121                                      xprt->sc_cm_id->port_num))
122                 return 1;
123         else
124                 return min_t(int, sge_count, xprt->sc_max_sge);
125 }
126 
127 /* Issue an RDMA_READ using the local lkey to map the data sink */
128 int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
129                         struct svc_rqst *rqstp,
130                         struct svc_rdma_op_ctxt *head,
131                         int *page_no,
132                         u32 *page_offset,
133                         u32 rs_handle,
134                         u32 rs_length,
135                         u64 rs_offset,
136                         bool last)
137 {
138         struct ib_send_wr read_wr;
139         int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
140         struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
141         int ret, read, pno;
142         u32 pg_off = *page_offset;
143         u32 pg_no = *page_no;
144 
145         ctxt->direction = DMA_FROM_DEVICE;
146         ctxt->read_hdr = head;
147         pages_needed =
148                 min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
149         read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
150                      rs_length);
151 
152         for (pno = 0; pno < pages_needed; pno++) {
153                 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
154 
155                 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
156                 head->arg.page_len += len;
157                 head->arg.len += len;
158                 if (!pg_off)
159                         head->count++;
160                 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
161                 rqstp->rq_next_page = rqstp->rq_respages + 1;
162                 ctxt->sge[pno].addr =
163                         ib_dma_map_page(xprt->sc_cm_id->device,
164                                         head->arg.pages[pg_no], pg_off,
165                                         PAGE_SIZE - pg_off,
166                                         DMA_FROM_DEVICE);
167                 ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
168                                            ctxt->sge[pno].addr);
169                 if (ret)
170                         goto err;
171                 atomic_inc(&xprt->sc_dma_used);
172 
173                 /* The lkey here is either a local dma lkey or a dma_mr lkey */
174                 ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
175                 ctxt->sge[pno].length = len;
176                 ctxt->count++;
177 
178                 /* adjust offset and wrap to next page if needed */
179                 pg_off += len;
180                 if (pg_off == PAGE_SIZE) {
181                         pg_off = 0;
182                         pg_no++;
183                 }
184                 rs_length -= len;
185         }
186 
187         if (last && rs_length == 0)
188                 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
189         else
190                 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
191 
192         memset(&read_wr, 0, sizeof(read_wr));
193         read_wr.wr_id = (unsigned long)ctxt;
194         read_wr.opcode = IB_WR_RDMA_READ;
195         ctxt->wr_op = read_wr.opcode;
196         read_wr.send_flags = IB_SEND_SIGNALED;
197         read_wr.wr.rdma.rkey = rs_handle;
198         read_wr.wr.rdma.remote_addr = rs_offset;
199         read_wr.sg_list = ctxt->sge;
200         read_wr.num_sge = pages_needed;
201 
202         ret = svc_rdma_send(xprt, &read_wr);
203         if (ret) {
204                 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
205                 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
206                 goto err;
207         }
208 
209         /* return current location in page array */
210         *page_no = pg_no;
211         *page_offset = pg_off;
212         ret = read;
213         atomic_inc(&rdma_stat_read);
214         return ret;
215  err:
216         svc_rdma_unmap_dma(ctxt);
217         svc_rdma_put_context(ctxt, 0);
218         return ret;
219 }
220 
221 /* Issue an RDMA_READ using an FRMR to map the data sink */
222 int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
223                          struct svc_rqst *rqstp,
224                          struct svc_rdma_op_ctxt *head,
225                          int *page_no,
226                          u32 *page_offset,
227                          u32 rs_handle,
228                          u32 rs_length,
229                          u64 rs_offset,
230                          bool last)
231 {
232         struct ib_send_wr read_wr;
233         struct ib_send_wr inv_wr;
234         struct ib_send_wr fastreg_wr;
235         u8 key;
236         int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
237         struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
238         struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
239         int ret, read, pno;
240         u32 pg_off = *page_offset;
241         u32 pg_no = *page_no;
242 
243         if (IS_ERR(frmr))
244                 return -ENOMEM;
245 
246         ctxt->direction = DMA_FROM_DEVICE;
247         ctxt->frmr = frmr;
248         pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
249         read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
250                      rs_length);
251 
252         frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
253         frmr->direction = DMA_FROM_DEVICE;
254         frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
255         frmr->map_len = pages_needed << PAGE_SHIFT;
256         frmr->page_list_len = pages_needed;
257 
258         for (pno = 0; pno < pages_needed; pno++) {
259                 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
260 
261                 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
262                 head->arg.page_len += len;
263                 head->arg.len += len;
264                 if (!pg_off)
265                         head->count++;
266                 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
267                 rqstp->rq_next_page = rqstp->rq_respages + 1;
268                 frmr->page_list->page_list[pno] =
269                         ib_dma_map_page(xprt->sc_cm_id->device,
270                                         head->arg.pages[pg_no], 0,
271                                         PAGE_SIZE, DMA_FROM_DEVICE);
272                 ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
273                                            frmr->page_list->page_list[pno]);
274                 if (ret)
275                         goto err;
276                 atomic_inc(&xprt->sc_dma_used);
277 
278                 /* adjust offset and wrap to next page if needed */
279                 pg_off += len;
280                 if (pg_off == PAGE_SIZE) {
281                         pg_off = 0;
282                         pg_no++;
283                 }
284                 rs_length -= len;
285         }
286 
287         if (last && rs_length == 0)
288                 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
289         else
290                 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
291 
292         /* Bump the key */
293         key = (u8)(frmr->mr->lkey & 0x000000FF);
294         ib_update_fast_reg_key(frmr->mr, ++key);
295 
296         ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
297         ctxt->sge[0].lkey = frmr->mr->lkey;
298         ctxt->sge[0].length = read;
299         ctxt->count = 1;
300         ctxt->read_hdr = head;
301 
302         /* Prepare FASTREG WR */
303         memset(&fastreg_wr, 0, sizeof(fastreg_wr));
304         fastreg_wr.opcode = IB_WR_FAST_REG_MR;
305         fastreg_wr.send_flags = IB_SEND_SIGNALED;
306         fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
307         fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
308         fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
309         fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
310         fastreg_wr.wr.fast_reg.length = frmr->map_len;
311         fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
312         fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
313         fastreg_wr.next = &read_wr;
314 
315         /* Prepare RDMA_READ */
316         memset(&read_wr, 0, sizeof(read_wr));
317         read_wr.send_flags = IB_SEND_SIGNALED;
318         read_wr.wr.rdma.rkey = rs_handle;
319         read_wr.wr.rdma.remote_addr = rs_offset;
320         read_wr.sg_list = ctxt->sge;
321         read_wr.num_sge = 1;
322         if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
323                 read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
324                 read_wr.wr_id = (unsigned long)ctxt;
325                 read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
326         } else {
327                 read_wr.opcode = IB_WR_RDMA_READ;
328                 read_wr.next = &inv_wr;
329                 /* Prepare invalidate */
330                 memset(&inv_wr, 0, sizeof(inv_wr));
331                 inv_wr.wr_id = (unsigned long)ctxt;
332                 inv_wr.opcode = IB_WR_LOCAL_INV;
333                 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
334                 inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
335         }
336         ctxt->wr_op = read_wr.opcode;
337 
338         /* Post the chain */
339         ret = svc_rdma_send(xprt, &fastreg_wr);
340         if (ret) {
341                 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
342                 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
343                 goto err;
344         }
345 
346         /* return current location in page array */
347         *page_no = pg_no;
348         *page_offset = pg_off;
349         ret = read;
350         atomic_inc(&rdma_stat_read);
351         return ret;
352  err:
353         svc_rdma_unmap_dma(ctxt);
354         svc_rdma_put_context(ctxt, 0);
355         svc_rdma_put_frmr(xprt, frmr);
356         return ret;
357 }
358 
359 static unsigned int
360 rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
361 {
362         unsigned int count;
363 
364         for (count = 0; ch->rc_discrim != xdr_zero; ch++)
365                 count++;
366         return count;
367 }
368 
369 /* If there was additional inline content, append it to the end of arg.pages.
370  * Tail copy has to be done after the reader function has determined how many
371  * pages are needed for RDMA READ.
372  */
373 static int
374 rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
375                u32 position, u32 byte_count, u32 page_offset, int page_no)
376 {
377         char *srcp, *destp;
378         int ret;
379 
380         ret = 0;
381         srcp = head->arg.head[0].iov_base + position;
382         byte_count = head->arg.head[0].iov_len - position;
383         if (byte_count > PAGE_SIZE) {
384                 dprintk("svcrdma: large tail unsupported\n");
385                 return 0;
386         }
387 
388         /* Fit as much of the tail on the current page as possible */
389         if (page_offset != PAGE_SIZE) {
390                 destp = page_address(rqstp->rq_arg.pages[page_no]);
391                 destp += page_offset;
392                 while (byte_count--) {
393                         *destp++ = *srcp++;
394                         page_offset++;
395                         if (page_offset == PAGE_SIZE && byte_count)
396                                 goto more;
397                 }
398                 goto done;
399         }
400 
401 more:
402         /* Fit the rest on the next page */
403         page_no++;
404         destp = page_address(rqstp->rq_arg.pages[page_no]);
405         while (byte_count--)
406                 *destp++ = *srcp++;
407 
408         rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
409         rqstp->rq_next_page = rqstp->rq_respages + 1;
410 
411 done:
412         byte_count = head->arg.head[0].iov_len - position;
413         head->arg.page_len += byte_count;
414         head->arg.len += byte_count;
415         head->arg.buflen += byte_count;
416         return 1;
417 }
418 
419 static int rdma_read_chunks(struct svcxprt_rdma *xprt,
420                             struct rpcrdma_msg *rmsgp,
421                             struct svc_rqst *rqstp,
422                             struct svc_rdma_op_ctxt *head)
423 {
424         int page_no, ret;
425         struct rpcrdma_read_chunk *ch;
426         u32 handle, page_offset, byte_count;
427         u32 position;
428         u64 rs_offset;
429         bool last;
430 
431         /* If no read list is present, return 0 */
432         ch = svc_rdma_get_read_chunk(rmsgp);
433         if (!ch)
434                 return 0;
435 
436         if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
437                 return -EINVAL;
438 
439         /* The request is completed when the RDMA_READs complete. The
440          * head context keeps all the pages that comprise the
441          * request.
442          */
443         head->arg.head[0] = rqstp->rq_arg.head[0];
444         head->arg.tail[0] = rqstp->rq_arg.tail[0];
445         head->hdr_count = head->count;
446         head->arg.page_base = 0;
447         head->arg.page_len = 0;
448         head->arg.len = rqstp->rq_arg.len;
449         head->arg.buflen = rqstp->rq_arg.buflen;
450 
451         ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
452         position = be32_to_cpu(ch->rc_position);
453 
454         /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
455         if (position == 0) {
456                 head->arg.pages = &head->pages[0];
457                 page_offset = head->byte_len;
458         } else {
459                 head->arg.pages = &head->pages[head->count];
460                 page_offset = 0;
461         }
462 
463         ret = 0;
464         page_no = 0;
465         for (; ch->rc_discrim != xdr_zero; ch++) {
466                 if (be32_to_cpu(ch->rc_position) != position)
467                         goto err;
468 
469                 handle = be32_to_cpu(ch->rc_target.rs_handle),
470                 byte_count = be32_to_cpu(ch->rc_target.rs_length);
471                 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
472                                  &rs_offset);
473 
474                 while (byte_count > 0) {
475                         last = (ch + 1)->rc_discrim == xdr_zero;
476                         ret = xprt->sc_reader(xprt, rqstp, head,
477                                               &page_no, &page_offset,
478                                               handle, byte_count,
479                                               rs_offset, last);
480                         if (ret < 0)
481                                 goto err;
482                         byte_count -= ret;
483                         rs_offset += ret;
484                         head->arg.buflen += ret;
485                 }
486         }
487 
488         /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
489         if (page_offset & 3) {
490                 u32 pad = 4 - (page_offset & 3);
491 
492                 head->arg.page_len += pad;
493                 head->arg.len += pad;
494                 head->arg.buflen += pad;
495                 page_offset += pad;
496         }
497 
498         ret = 1;
499         if (position && position < head->arg.head[0].iov_len)
500                 ret = rdma_copy_tail(rqstp, head, position,
501                                      byte_count, page_offset, page_no);
502         head->arg.head[0].iov_len = position;
503         head->position = position;
504 
505  err:
506         /* Detach arg pages. svc_recv will replenish them */
507         for (page_no = 0;
508              &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
509                 rqstp->rq_pages[page_no] = NULL;
510 
511         return ret;
512 }
513 
514 static int rdma_read_complete(struct svc_rqst *rqstp,
515                               struct svc_rdma_op_ctxt *head)
516 {
517         int page_no;
518         int ret;
519 
520         /* Copy RPC pages */
521         for (page_no = 0; page_no < head->count; page_no++) {
522                 put_page(rqstp->rq_pages[page_no]);
523                 rqstp->rq_pages[page_no] = head->pages[page_no];
524         }
525 
526         /* Adjustments made for RDMA_NOMSG type requests */
527         if (head->position == 0) {
528                 if (head->arg.len <= head->sge[0].length) {
529                         head->arg.head[0].iov_len = head->arg.len -
530                                                         head->byte_len;
531                         head->arg.page_len = 0;
532                 } else {
533                         head->arg.head[0].iov_len = head->sge[0].length -
534                                                                 head->byte_len;
535                         head->arg.page_len = head->arg.len -
536                                                 head->sge[0].length;
537                 }
538         }
539 
540         /* Point rq_arg.pages past header */
541         rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
542         rqstp->rq_arg.page_len = head->arg.page_len;
543         rqstp->rq_arg.page_base = head->arg.page_base;
544 
545         /* rq_respages starts after the last arg page */
546         rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
547         rqstp->rq_next_page = rqstp->rq_respages + 1;
548 
549         /* Rebuild rq_arg head and tail. */
550         rqstp->rq_arg.head[0] = head->arg.head[0];
551         rqstp->rq_arg.tail[0] = head->arg.tail[0];
552         rqstp->rq_arg.len = head->arg.len;
553         rqstp->rq_arg.buflen = head->arg.buflen;
554 
555         /* Free the context */
556         svc_rdma_put_context(head, 0);
557 
558         /* XXX: What should this be? */
559         rqstp->rq_prot = IPPROTO_MAX;
560         svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
561 
562         ret = rqstp->rq_arg.head[0].iov_len
563                 + rqstp->rq_arg.page_len
564                 + rqstp->rq_arg.tail[0].iov_len;
565         dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, "
566                 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n",
567                 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
568                 rqstp->rq_arg.head[0].iov_len);
569 
570         return ret;
571 }
572 
573 /*
574  * Set up the rqstp thread context to point to the RQ buffer. If
575  * necessary, pull additional data from the client with an RDMA_READ
576  * request.
577  */
578 int svc_rdma_recvfrom(struct svc_rqst *rqstp)
579 {
580         struct svc_xprt *xprt = rqstp->rq_xprt;
581         struct svcxprt_rdma *rdma_xprt =
582                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
583         struct svc_rdma_op_ctxt *ctxt = NULL;
584         struct rpcrdma_msg *rmsgp;
585         int ret = 0;
586         int len;
587 
588         dprintk("svcrdma: rqstp=%p\n", rqstp);
589 
590         spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
591         if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
592                 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
593                                   struct svc_rdma_op_ctxt,
594                                   dto_q);
595                 list_del_init(&ctxt->dto_q);
596                 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
597                 return rdma_read_complete(rqstp, ctxt);
598         } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
599                 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
600                                   struct svc_rdma_op_ctxt,
601                                   dto_q);
602                 list_del_init(&ctxt->dto_q);
603         } else {
604                 atomic_inc(&rdma_stat_rq_starve);
605                 clear_bit(XPT_DATA, &xprt->xpt_flags);
606                 ctxt = NULL;
607         }
608         spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
609         if (!ctxt) {
610                 /* This is the EAGAIN path. The svc_recv routine will
611                  * return -EAGAIN, the nfsd thread will go to call into
612                  * svc_recv again and we shouldn't be on the active
613                  * transport list
614                  */
615                 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
616                         goto close_out;
617 
618                 goto out;
619         }
620         dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
621                 ctxt, rdma_xprt, rqstp, ctxt->wc_status);
622         atomic_inc(&rdma_stat_recv);
623 
624         /* Build up the XDR from the receive buffers. */
625         rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
626 
627         /* Decode the RDMA header. */
628         len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
629         rqstp->rq_xprt_hlen = len;
630 
631         /* If the request is invalid, reply with an error */
632         if (len < 0) {
633                 if (len == -ENOSYS)
634                         svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
635                 goto close_out;
636         }
637 
638         /* Read read-list data. */
639         ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
640         if (ret > 0) {
641                 /* read-list posted, defer until data received from client. */
642                 goto defer;
643         } else if (ret < 0) {
644                 /* Post of read-list failed, free context. */
645                 svc_rdma_put_context(ctxt, 1);
646                 return 0;
647         }
648 
649         ret = rqstp->rq_arg.head[0].iov_len
650                 + rqstp->rq_arg.page_len
651                 + rqstp->rq_arg.tail[0].iov_len;
652         svc_rdma_put_context(ctxt, 0);
653  out:
654         dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
655                 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
656                 ret, rqstp->rq_arg.len,
657                 rqstp->rq_arg.head[0].iov_base,
658                 rqstp->rq_arg.head[0].iov_len);
659         rqstp->rq_prot = IPPROTO_MAX;
660         svc_xprt_copy_addrs(rqstp, xprt);
661         return ret;
662 
663  close_out:
664         if (ctxt)
665                 svc_rdma_put_context(ctxt, 1);
666         dprintk("svcrdma: transport %p is closing\n", xprt);
667         /*
668          * Set the close bit and enqueue it. svc_recv will see the
669          * close bit and call svc_xprt_delete
670          */
671         set_bit(XPT_CLOSE, &xprt->xpt_flags);
672 defer:
673         return 0;
674 }
675 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp