--- zzzz-none-000/linux-3.10.107/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c 2021-02-04 17:41:59.000000000 +0000 @@ -1,4 +1,5 @@ /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -58,6 +59,7 @@ struct svc_rdma_op_ctxt *ctxt, u32 byte_count) { + struct rpcrdma_msg *rmsgp; struct page *page; u32 bc; int sge_no; @@ -69,7 +71,8 @@ /* Set up the XDR head */ rqstp->rq_arg.head[0].iov_base = page_address(page); - rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); + rqstp->rq_arg.head[0].iov_len = + min_t(size_t, byte_count, ctxt->sge[0].length); rqstp->rq_arg.len = byte_count; rqstp->rq_arg.buflen = byte_count; @@ -79,25 +82,25 @@ /* If data remains, store it in the pagelist */ rqstp->rq_arg.page_len = bc; rqstp->rq_arg.page_base = 0; - rqstp->rq_arg.pages = &rqstp->rq_pages[1]; + + /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ + rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base; + if (rmsgp->rm_type == rdma_nomsg) + rqstp->rq_arg.pages = &rqstp->rq_pages[0]; + else + rqstp->rq_arg.pages = &rqstp->rq_pages[1]; + sge_no = 1; while (bc && sge_no < ctxt->count) { page = ctxt->pages[sge_no]; put_page(rqstp->rq_pages[sge_no]); rqstp->rq_pages[sge_no] = page; - bc -= min(bc, ctxt->sge[sge_no].length); + bc -= min_t(u32, bc, ctxt->sge[sge_no].length); rqstp->rq_arg.buflen += ctxt->sge[sge_no].length; sge_no++; } rqstp->rq_respages = &rqstp->rq_pages[sge_no]; - - /* We should never run out of SGE because the limit is defined to - * support the max allowed RPC data length - */ - BUG_ON(bc && (sge_no == ctxt->count)); - BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len) - != byte_count); - BUG_ON(rqstp->rq_arg.len != byte_count); + rqstp->rq_next_page = rqstp->rq_respages + 1; /* If not all pages were used from the SGL, free the remaining ones */ bc = sge_no; @@ -112,422 +115,395 @@ rqstp->rq_arg.tail[0].iov_len = 0; } -/* Encode a read-chunk-list as an array of IB SGE - * - * Assumptions: - * - chunk[0]->position points to pages[0] at an offset of 0 - * - pages[] is not physically or virtually contiguous and consists of - * PAGE_SIZE elements. - * - * Output: - * - sge array pointing into pages[] array. - * - chunk_sge array specifying sge index and count for each - * chunk in the read list - * - */ -static int map_read_chunks(struct svcxprt_rdma *xprt, - struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head, - struct rpcrdma_msg *rmsgp, - struct svc_rdma_req_map *rpl_map, - struct svc_rdma_req_map *chl_map, - int ch_count, - int byte_count) +/* Issue an RDMA_READ using the local lkey to map the data sink */ +int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, + struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *head, + int *page_no, + u32 *page_offset, + u32 rs_handle, + u32 rs_length, + u64 rs_offset, + bool last) { - int sge_no; - int sge_bytes; - int page_off; - int page_no; - int ch_bytes; - int ch_no; - struct rpcrdma_read_chunk *ch; + struct ib_rdma_wr read_wr; + int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; + struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); + int ret, read, pno; + u32 pg_off = *page_offset; + u32 pg_no = *page_no; - sge_no = 0; - page_no = 0; - page_off = 0; - ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; - ch_no = 0; - ch_bytes = ntohl(ch->rc_target.rs_length); - head->arg.head[0] = rqstp->rq_arg.head[0]; - head->arg.tail[0] = rqstp->rq_arg.tail[0]; - head->arg.pages = &head->pages[head->count]; - head->hdr_count = head->count; /* save count of hdr pages */ - head->arg.page_base = 0; - head->arg.page_len = ch_bytes; - head->arg.len = rqstp->rq_arg.len + ch_bytes; - head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; - head->count++; - chl_map->ch[0].start = 0; - while (byte_count) { - rpl_map->sge[sge_no].iov_base = - page_address(rqstp->rq_arg.pages[page_no]) + page_off; - sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); - rpl_map->sge[sge_no].iov_len = sge_bytes; - /* - * Don't bump head->count here because the same page - * may be used by multiple SGE. - */ - head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; + ctxt->direction = DMA_FROM_DEVICE; + ctxt->read_hdr = head; + pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd); + read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset, + rs_length); + + for (pno = 0; pno < pages_needed; pno++) { + int len = min_t(int, rs_length, PAGE_SIZE - pg_off); + + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; + head->arg.page_len += len; + head->arg.len += len; + if (!pg_off) + head->count++; + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + ctxt->sge[pno].addr = + ib_dma_map_page(xprt->sc_cm_id->device, + head->arg.pages[pg_no], pg_off, + PAGE_SIZE - pg_off, + DMA_FROM_DEVICE); + ret = ib_dma_mapping_error(xprt->sc_cm_id->device, + ctxt->sge[pno].addr); + if (ret) + goto err; + atomic_inc(&xprt->sc_dma_used); - byte_count -= sge_bytes; - ch_bytes -= sge_bytes; - sge_no++; - /* - * If all bytes for this chunk have been mapped to an - * SGE, move to the next SGE - */ - if (ch_bytes == 0) { - chl_map->ch[ch_no].count = - sge_no - chl_map->ch[ch_no].start; - ch_no++; - ch++; - chl_map->ch[ch_no].start = sge_no; - ch_bytes = ntohl(ch->rc_target.rs_length); - /* If bytes remaining account for next chunk */ - if (byte_count) { - head->arg.page_len += ch_bytes; - head->arg.len += ch_bytes; - head->arg.buflen += ch_bytes; - } + /* The lkey here is either a local dma lkey or a dma_mr lkey */ + ctxt->sge[pno].lkey = xprt->sc_dma_lkey; + ctxt->sge[pno].length = len; + ctxt->count++; + + /* adjust offset and wrap to next page if needed */ + pg_off += len; + if (pg_off == PAGE_SIZE) { + pg_off = 0; + pg_no++; } - /* - * If this SGE consumed all of the page, move to the - * next page - */ - if ((sge_bytes + page_off) == PAGE_SIZE) { - page_no++; - page_off = 0; - /* - * If there are still bytes left to map, bump - * the page count - */ - if (byte_count) - head->count++; - } else - page_off += sge_bytes; + rs_length -= len; } - BUG_ON(byte_count != 0); - return sge_no; + + if (last && rs_length == 0) + set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); + else + clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); + + memset(&read_wr, 0, sizeof(read_wr)); + read_wr.wr.wr_id = (unsigned long)ctxt; + read_wr.wr.opcode = IB_WR_RDMA_READ; + ctxt->wr_op = read_wr.wr.opcode; + read_wr.wr.send_flags = IB_SEND_SIGNALED; + read_wr.rkey = rs_handle; + read_wr.remote_addr = rs_offset; + read_wr.wr.sg_list = ctxt->sge; + read_wr.wr.num_sge = pages_needed; + + ret = svc_rdma_send(xprt, &read_wr.wr); + if (ret) { + pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); + goto err; + } + + /* return current location in page array */ + *page_no = pg_no; + *page_offset = pg_off; + ret = read; + atomic_inc(&rdma_stat_read); + return ret; + err: + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 0); + return ret; } -/* Map a read-chunk-list to an XDR and fast register the page-list. - * - * Assumptions: - * - chunk[0] position points to pages[0] at an offset of 0 - * - pages[] will be made physically contiguous by creating a one-off memory - * region using the fastreg verb. - * - byte_count is # of bytes in read-chunk-list - * - ch_count is # of chunks in read-chunk-list - * - * Output: - * - sge array pointing into pages[] array. - * - chunk_sge array specifying sge index and count for each - * chunk in the read list - */ -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt, - struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head, - struct rpcrdma_msg *rmsgp, - struct svc_rdma_req_map *rpl_map, - struct svc_rdma_req_map *chl_map, - int ch_count, - int byte_count) +/* Issue an RDMA_READ using an FRMR to map the data sink */ +int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, + struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *head, + int *page_no, + u32 *page_offset, + u32 rs_handle, + u32 rs_length, + u64 rs_offset, + bool last) { - int page_no; - int ch_no; - u32 offset; - struct rpcrdma_read_chunk *ch; - struct svc_rdma_fastreg_mr *frmr; - int ret = 0; + struct ib_rdma_wr read_wr; + struct ib_send_wr inv_wr; + struct ib_reg_wr reg_wr; + u8 key; + int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; + struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); + struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt); + int ret, read, pno, dma_nents, n; + u32 pg_off = *page_offset; + u32 pg_no = *page_no; - frmr = svc_rdma_get_frmr(xprt); if (IS_ERR(frmr)) return -ENOMEM; - head->frmr = frmr; - head->arg.head[0] = rqstp->rq_arg.head[0]; - head->arg.tail[0] = rqstp->rq_arg.tail[0]; - head->arg.pages = &head->pages[head->count]; - head->hdr_count = head->count; /* save count of hdr pages */ - head->arg.page_base = 0; - head->arg.page_len = byte_count; - head->arg.len = rqstp->rq_arg.len + byte_count; - head->arg.buflen = rqstp->rq_arg.buflen + byte_count; + ctxt->direction = DMA_FROM_DEVICE; + ctxt->frmr = frmr; + nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len); + read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length); - /* Fast register the page list */ - frmr->kva = page_address(rqstp->rq_arg.pages[0]); frmr->direction = DMA_FROM_DEVICE; frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE); - frmr->map_len = byte_count; - frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT; - for (page_no = 0; page_no < frmr->page_list_len; page_no++) { - frmr->page_list->page_list[page_no] = - ib_dma_map_page(xprt->sc_cm_id->device, - rqstp->rq_arg.pages[page_no], 0, - PAGE_SIZE, DMA_FROM_DEVICE); - if (ib_dma_mapping_error(xprt->sc_cm_id->device, - frmr->page_list->page_list[page_no])) - goto fatal_err; - atomic_inc(&xprt->sc_dma_used); - head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; - } - head->count += page_no; + frmr->sg_nents = nents; - /* rq_respages points one past arg pages */ - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; + for (pno = 0; pno < nents; pno++) { + int len = min_t(int, rs_length, PAGE_SIZE - pg_off); - /* Create the reply and chunk maps */ - offset = 0; - ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; - for (ch_no = 0; ch_no < ch_count; ch_no++) { - int len = ntohl(ch->rc_target.rs_length); - rpl_map->sge[ch_no].iov_base = frmr->kva + offset; - rpl_map->sge[ch_no].iov_len = len; - chl_map->ch[ch_no].count = 1; - chl_map->ch[ch_no].start = ch_no; - offset += len; - ch++; - } - - ret = svc_rdma_fastreg(xprt, frmr); - if (ret) - goto fatal_err; + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; + head->arg.page_len += len; + head->arg.len += len; + if (!pg_off) + head->count++; + + sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no], + len, pg_off); + + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + + /* adjust offset and wrap to next page if needed */ + pg_off += len; + if (pg_off == PAGE_SIZE) { + pg_off = 0; + pg_no++; + } + rs_length -= len; + } - return ch_no; + if (last && rs_length == 0) + set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); + else + clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); - fatal_err: - printk("svcrdma: error fast registering xdr for xprt %p", xprt); + dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device, + frmr->sg, frmr->sg_nents, + frmr->direction); + if (!dma_nents) { + pr_err("svcrdma: failed to dma map sg %p\n", + frmr->sg); + return -ENOMEM; + } + atomic_inc(&xprt->sc_dma_used); + + n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, PAGE_SIZE); + if (unlikely(n != frmr->sg_nents)) { + pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n", + frmr->mr, n, frmr->sg_nents); + return n < 0 ? n : -EINVAL; + } + + /* Bump the key */ + key = (u8)(frmr->mr->lkey & 0x000000FF); + ib_update_fast_reg_key(frmr->mr, ++key); + + ctxt->sge[0].addr = frmr->mr->iova; + ctxt->sge[0].lkey = frmr->mr->lkey; + ctxt->sge[0].length = frmr->mr->length; + ctxt->count = 1; + ctxt->read_hdr = head; + + /* Prepare REG WR */ + reg_wr.wr.opcode = IB_WR_REG_MR; + reg_wr.wr.wr_id = 0; + reg_wr.wr.send_flags = IB_SEND_SIGNALED; + reg_wr.wr.num_sge = 0; + reg_wr.mr = frmr->mr; + reg_wr.key = frmr->mr->lkey; + reg_wr.access = frmr->access_flags; + reg_wr.wr.next = &read_wr.wr; + + /* Prepare RDMA_READ */ + memset(&read_wr, 0, sizeof(read_wr)); + read_wr.wr.send_flags = IB_SEND_SIGNALED; + read_wr.rkey = rs_handle; + read_wr.remote_addr = rs_offset; + read_wr.wr.sg_list = ctxt->sge; + read_wr.wr.num_sge = 1; + if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { + read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV; + read_wr.wr.wr_id = (unsigned long)ctxt; + read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; + } else { + read_wr.wr.opcode = IB_WR_RDMA_READ; + read_wr.wr.next = &inv_wr; + /* Prepare invalidate */ + memset(&inv_wr, 0, sizeof(inv_wr)); + inv_wr.wr_id = (unsigned long)ctxt; + inv_wr.opcode = IB_WR_LOCAL_INV; + inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; + inv_wr.ex.invalidate_rkey = frmr->mr->lkey; + } + ctxt->wr_op = read_wr.wr.opcode; + + /* Post the chain */ + ret = svc_rdma_send(xprt, ®_wr.wr); + if (ret) { + pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); + goto err; + } + + /* return current location in page array */ + *page_no = pg_no; + *page_offset = pg_off; + ret = read; + atomic_inc(&rdma_stat_read); + return ret; + err: + svc_rdma_put_context(ctxt, 0); svc_rdma_put_frmr(xprt, frmr); - return -EIO; + return ret; } -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt, - struct svc_rdma_op_ctxt *ctxt, - struct svc_rdma_fastreg_mr *frmr, - struct kvec *vec, - u64 *sgl_offset, - int count) +static unsigned int +rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch) { - int i; - unsigned long off; + unsigned int count; - ctxt->count = count; - ctxt->direction = DMA_FROM_DEVICE; - for (i = 0; i < count; i++) { - ctxt->sge[i].length = 0; /* in case map fails */ - if (!frmr) { - BUG_ON(!virt_to_page(vec[i].iov_base)); - off = (unsigned long)vec[i].iov_base & ~PAGE_MASK; - ctxt->sge[i].addr = - ib_dma_map_page(xprt->sc_cm_id->device, - virt_to_page(vec[i].iov_base), - off, - vec[i].iov_len, - DMA_FROM_DEVICE); - if (ib_dma_mapping_error(xprt->sc_cm_id->device, - ctxt->sge[i].addr)) - return -EINVAL; - ctxt->sge[i].lkey = xprt->sc_dma_lkey; - atomic_inc(&xprt->sc_dma_used); - } else { - ctxt->sge[i].addr = (unsigned long)vec[i].iov_base; - ctxt->sge[i].lkey = frmr->mr->lkey; - } - ctxt->sge[i].length = vec[i].iov_len; - *sgl_offset = *sgl_offset + vec[i].iov_len; - } - return 0; + for (count = 0; ch->rc_discrim != xdr_zero; ch++) + count++; + return count; } -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) +/* If there was additional inline content, append it to the end of arg.pages. + * Tail copy has to be done after the reader function has determined how many + * pages are needed for RDMA READ. + */ +static int +rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head, + u32 position, u32 byte_count, u32 page_offset, int page_no) { - if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == - RDMA_TRANSPORT_IWARP) && - sge_count > 1) - return 1; - else - return min_t(int, sge_count, xprt->sc_max_sge); + char *srcp, *destp; + int ret; + + ret = 0; + srcp = head->arg.head[0].iov_base + position; + byte_count = head->arg.head[0].iov_len - position; + if (byte_count > PAGE_SIZE) { + dprintk("svcrdma: large tail unsupported\n"); + return 0; + } + + /* Fit as much of the tail on the current page as possible */ + if (page_offset != PAGE_SIZE) { + destp = page_address(rqstp->rq_arg.pages[page_no]); + destp += page_offset; + while (byte_count--) { + *destp++ = *srcp++; + page_offset++; + if (page_offset == PAGE_SIZE && byte_count) + goto more; + } + goto done; + } + +more: + /* Fit the rest on the next page */ + page_no++; + destp = page_address(rqstp->rq_arg.pages[page_no]); + while (byte_count--) + *destp++ = *srcp++; + + rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + +done: + byte_count = head->arg.head[0].iov_len - position; + head->arg.page_len += byte_count; + head->arg.len += byte_count; + head->arg.buflen += byte_count; + return 1; } -/* - * Use RDMA_READ to read data from the advertised client buffer into the - * XDR stream starting at rq_arg.head[0].iov_base. - * Each chunk in the array - * contains the following fields: - * discrim - '1', This isn't used for data placement - * position - The xdr stream offset (the same for every chunk) - * handle - RMR for client memory region - * length - data transfer length - * offset - 64 bit tagged offset in remote memory region - * - * On our side, we need to read into a pagelist. The first page immediately - * follows the RPC header. - * - * This function returns: - * 0 - No error and no read-list found. - * - * 1 - Successful read-list processing. The data is not yet in - * the pagelist and therefore the RPC request must be deferred. The - * I/O completion will enqueue the transport again and - * svc_rdma_recvfrom will complete the request. - * - * <0 - Error processing/posting read-list. - * - * NOTE: The ctxt must not be touched after the last WR has been posted - * because the I/O completion processing may occur on another - * processor and free / modify the context. Ne touche pas! - */ -static int rdma_read_xdr(struct svcxprt_rdma *xprt, - struct rpcrdma_msg *rmsgp, - struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *hdr_ctxt) +static int rdma_read_chunks(struct svcxprt_rdma *xprt, + struct rpcrdma_msg *rmsgp, + struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *head) { - struct ib_send_wr read_wr; - struct ib_send_wr inv_wr; - int err = 0; - int ch_no; - int ch_count; - int byte_count; - int sge_count; - u64 sgl_offset; + int page_no, ret; struct rpcrdma_read_chunk *ch; - struct svc_rdma_op_ctxt *ctxt = NULL; - struct svc_rdma_req_map *rpl_map; - struct svc_rdma_req_map *chl_map; + u32 handle, page_offset, byte_count; + u32 position; + u64 rs_offset; + bool last; /* If no read list is present, return 0 */ ch = svc_rdma_get_read_chunk(rmsgp); if (!ch) return 0; - svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); - if (ch_count > RPCSVC_MAXPAGES) + if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES) return -EINVAL; - /* Allocate temporary reply and chunk maps */ - rpl_map = svc_rdma_get_req_map(); - chl_map = svc_rdma_get_req_map(); - - if (!xprt->sc_frmr_pg_list_len) - sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, - rpl_map, chl_map, ch_count, - byte_count); - else - sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, - rpl_map, chl_map, ch_count, - byte_count); - if (sge_count < 0) { - err = -EIO; - goto out; - } + /* The request is completed when the RDMA_READs complete. The + * head context keeps all the pages that comprise the + * request. + */ + head->arg.head[0] = rqstp->rq_arg.head[0]; + head->arg.tail[0] = rqstp->rq_arg.tail[0]; + head->hdr_count = head->count; + head->arg.page_base = 0; + head->arg.page_len = 0; + head->arg.len = rqstp->rq_arg.len; + head->arg.buflen = rqstp->rq_arg.buflen; - sgl_offset = 0; - ch_no = 0; + ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; + position = be32_to_cpu(ch->rc_position); - for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; - ch->rc_discrim != 0; ch++, ch_no++) { - u64 rs_offset; -next_sge: - ctxt = svc_rdma_get_context(xprt); - ctxt->direction = DMA_FROM_DEVICE; - ctxt->frmr = hdr_ctxt->frmr; - ctxt->read_hdr = NULL; - clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); - clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); + /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ + if (position == 0) { + head->arg.pages = &head->pages[0]; + page_offset = head->byte_len; + } else { + head->arg.pages = &head->pages[head->count]; + page_offset = 0; + } + + ret = 0; + page_no = 0; + for (; ch->rc_discrim != xdr_zero; ch++) { + if (be32_to_cpu(ch->rc_position) != position) + goto err; - /* Prepare READ WR */ - memset(&read_wr, 0, sizeof read_wr); - read_wr.wr_id = (unsigned long)ctxt; - read_wr.opcode = IB_WR_RDMA_READ; - ctxt->wr_op = read_wr.opcode; - read_wr.send_flags = IB_SEND_SIGNALED; - read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle); + handle = be32_to_cpu(ch->rc_target.rs_handle), + byte_count = be32_to_cpu(ch->rc_target.rs_length); xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset, &rs_offset); - read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset; - read_wr.sg_list = ctxt->sge; - read_wr.num_sge = - rdma_read_max_sge(xprt, chl_map->ch[ch_no].count); - err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr, - &rpl_map->sge[chl_map->ch[ch_no].start], - &sgl_offset, - read_wr.num_sge); - if (err) { - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 0); - goto out; - } - if (((ch+1)->rc_discrim == 0) && - (read_wr.num_sge == chl_map->ch[ch_no].count)) { - /* - * Mark the last RDMA_READ with a bit to - * indicate all RPC data has been fetched from - * the client and the RPC needs to be enqueued. - */ - set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); - if (hdr_ctxt->frmr) { - set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); - /* - * Invalidate the local MR used to map the data - * sink. - */ - if (xprt->sc_dev_caps & - SVCRDMA_DEVCAP_READ_W_INV) { - read_wr.opcode = - IB_WR_RDMA_READ_WITH_INV; - ctxt->wr_op = read_wr.opcode; - read_wr.ex.invalidate_rkey = - ctxt->frmr->mr->lkey; - } else { - /* Prepare INVALIDATE WR */ - memset(&inv_wr, 0, sizeof inv_wr); - inv_wr.opcode = IB_WR_LOCAL_INV; - inv_wr.send_flags = IB_SEND_SIGNALED; - inv_wr.ex.invalidate_rkey = - hdr_ctxt->frmr->mr->lkey; - read_wr.next = &inv_wr; - } - } - ctxt->read_hdr = hdr_ctxt; - } - /* Post the read */ - err = svc_rdma_send(xprt, &read_wr); - if (err) { - printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n", - err); - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 0); - goto out; - } - atomic_inc(&rdma_stat_read); - if (read_wr.num_sge < chl_map->ch[ch_no].count) { - chl_map->ch[ch_no].count -= read_wr.num_sge; - chl_map->ch[ch_no].start += read_wr.num_sge; - goto next_sge; + while (byte_count > 0) { + last = (ch + 1)->rc_discrim == xdr_zero; + ret = xprt->sc_reader(xprt, rqstp, head, + &page_no, &page_offset, + handle, byte_count, + rs_offset, last); + if (ret < 0) + goto err; + byte_count -= ret; + rs_offset += ret; + head->arg.buflen += ret; } - sgl_offset = 0; - err = 1; } - out: - svc_rdma_put_req_map(rpl_map); - svc_rdma_put_req_map(chl_map); + /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */ + if (page_offset & 3) { + u32 pad = 4 - (page_offset & 3); + + head->arg.page_len += pad; + head->arg.len += pad; + head->arg.buflen += pad; + page_offset += pad; + } + + ret = 1; + if (position && position < head->arg.head[0].iov_len) + ret = rdma_copy_tail(rqstp, head, position, + byte_count, page_offset, page_no); + head->arg.head[0].iov_len = position; + head->position = position; + err: /* Detach arg pages. svc_recv will replenish them */ - for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) - rqstp->rq_pages[ch_no] = NULL; - - /* - * Detach res pages. If svc_release sees any it will attempt to - * put them. - */ - while (rqstp->rq_next_page != rqstp->rq_respages) - *(--rqstp->rq_next_page) = NULL; + for (page_no = 0; + &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++) + rqstp->rq_pages[page_no] = NULL; - return err; + return ret; } static int rdma_read_complete(struct svc_rqst *rqstp, @@ -536,21 +512,34 @@ int page_no; int ret; - BUG_ON(!head); - /* Copy RPC pages */ for (page_no = 0; page_no < head->count; page_no++) { put_page(rqstp->rq_pages[page_no]); rqstp->rq_pages[page_no] = head->pages[page_no]; } + + /* Adjustments made for RDMA_NOMSG type requests */ + if (head->position == 0) { + if (head->arg.len <= head->sge[0].length) { + head->arg.head[0].iov_len = head->arg.len - + head->byte_len; + head->arg.page_len = 0; + } else { + head->arg.head[0].iov_len = head->sge[0].length - + head->byte_len; + head->arg.page_len = head->arg.len - + head->sge[0].length; + } + } + /* Point rq_arg.pages past header */ rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; rqstp->rq_arg.page_len = head->arg.page_len; rqstp->rq_arg.page_base = head->arg.page_base; /* rq_respages starts after the last arg page */ - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; - rqstp->rq_next_page = &rqstp->rq_arg.pages[page_no]; + rqstp->rq_respages = &rqstp->rq_pages[page_no]; + rqstp->rq_next_page = rqstp->rq_respages + 1; /* Rebuild rq_arg head and tail. */ rqstp->rq_arg.head[0] = head->arg.head[0]; @@ -568,8 +557,8 @@ ret = rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len + rqstp->rq_arg.tail[0].iov_len; - dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, " - "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", + dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, " + "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n", ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, rqstp->rq_arg.head[0].iov_len); @@ -599,13 +588,9 @@ struct svc_rdma_op_ctxt, dto_q); list_del_init(&ctxt->dto_q); - } - if (ctxt) { spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); return rdma_read_complete(rqstp, ctxt); - } - - if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { + } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, struct svc_rdma_op_ctxt, dto_q); @@ -625,12 +610,10 @@ if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) goto close_out; - BUG_ON(ret); goto out; } dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", ctxt, rdma_xprt, rqstp, ctxt->wc_status); - BUG_ON(ctxt->wc_status != IB_WC_SUCCESS); atomic_inc(&rdma_stat_recv); /* Build up the XDR from the receive buffers. */ @@ -648,12 +631,11 @@ } /* Read read-list data. */ - ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt); + ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt); if (ret > 0) { /* read-list posted, defer until data received from client. */ goto defer; - } - if (ret < 0) { + } else if (ret < 0) { /* Post of read-list failed, free context. */ svc_rdma_put_context(ctxt, 1); return 0; @@ -664,8 +646,8 @@ + rqstp->rq_arg.tail[0].iov_len; svc_rdma_put_context(ctxt, 0); out: - dprintk("svcrdma: ret = %d, rq_arg.len =%d, " - "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", + dprintk("svcrdma: ret=%d, rq_arg.len=%u, " + "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n", ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, rqstp->rq_arg.head[0].iov_len);