--- zzzz-none-000/linux-3.10.107/net/sunrpc/xprtrdma/rpc_rdma.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/net/sunrpc/xprtrdma/rpc_rdma.c 2021-02-04 17:41:59.000000000 +0000 @@ -49,7 +49,7 @@ #include -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS #endif @@ -61,7 +61,7 @@ rpcrdma_replych }; -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static const char transfertypes[][12] = { "pure inline", /* no chunks */ " read chunk", /* some argument via rdma read */ @@ -71,6 +71,67 @@ }; #endif +/* The client can send a request inline as long as the RPCRDMA header + * plus the RPC call fit under the transport's inline limit. If the + * combined call message size exceeds that limit, the client must use + * the read chunk list for this operation. + */ +static bool rpcrdma_args_inline(struct rpc_rqst *rqst) +{ + unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; + + return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); +} + +/* The client can't know how large the actual reply will be. Thus it + * plans for the largest possible reply for that particular ULP + * operation. If the maximum combined reply message size exceeds that + * limit, the client must provide a write list or a reply chunk for + * this request. + */ +static bool rpcrdma_results_inline(struct rpc_rqst *rqst) +{ + unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; + + return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); +} + +static int +rpcrdma_tail_pullup(struct xdr_buf *buf) +{ + size_t tlen = buf->tail[0].iov_len; + size_t skip = tlen & 3; + + /* Do not include the tail if it is only an XDR pad */ + if (tlen < 4) + return 0; + + /* xdr_write_pages() adds a pad at the beginning of the tail + * if the content in "buf->pages" is unaligned. Force the + * tail's actual content to land at the next XDR position + * after the head instead. + */ + if (skip) { + unsigned char *src, *dst; + unsigned int count; + + src = buf->tail[0].iov_base; + dst = buf->head[0].iov_base; + dst += buf->head[0].iov_len; + + src += skip; + tlen -= skip; + + dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n", + __func__, skip, dst, src, tlen); + + for (count = tlen; count; count--) + *dst++ = *src++; + } + + return tlen; +} + /* * Chunk assembly from upper layer xdr_buf. * @@ -78,8 +139,7 @@ * elements. Segments are then coalesced when registered, if possible * within the selected memreg mode. * - * Note, this routine is never called if the connection's memory - * registration strategy is 0 (bounce buffers). + * Returns positive number of segments converted, or a negative errno. */ static int @@ -102,10 +162,17 @@ page_base = xdrbuf->page_base & ~PAGE_MASK; p = 0; while (len && n < nsegs) { + if (!ppages[p]) { + /* alloc the pagelist for receiving buffer */ + ppages[p] = alloc_page(GFP_ATOMIC); + if (!ppages[p]) + return -ENOMEM; + } seg[n].mr_page = ppages[p]; seg[n].mr_offset = (void *)(unsigned long) page_base; seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); - BUG_ON(seg[n].mr_len > PAGE_SIZE); + if (seg[n].mr_len > PAGE_SIZE) + return -EIO; len -= seg[n].mr_len; ++n; ++p; @@ -114,7 +181,11 @@ /* Message overflows the seg array */ if (len && n == nsegs) - return 0; + return -EIO; + + /* When encoding the read list, the tail is always sent inline */ + if (type == rpcrdma_readch) + return n; if (xdrbuf->tail[0].iov_len) { /* the rpcrdma protocol allows us to omit any trailing @@ -123,7 +194,7 @@ return n; if (n == nsegs) /* Tail remains, but we're out of segments */ - return 0; + return -EIO; seg[n].mr_page = NULL; seg[n].mr_offset = xdrbuf->tail[0].iov_base; seg[n].mr_len = xdrbuf->tail[0].iov_len; @@ -164,21 +235,24 @@ * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO + * + * Returns positive RPC/RDMA header size, or negative errno. */ -static unsigned int +static ssize_t rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); - int nsegs, nchunks = 0; + int n, nsegs, nchunks = 0; unsigned int pos; struct rpcrdma_mr_seg *seg = req->rl_segments; struct rpcrdma_read_chunk *cur_rchunk = NULL; struct rpcrdma_write_array *warray = NULL; struct rpcrdma_write_chunk *cur_wchunk = NULL; __be32 *iptr = headerp->rm_body.rm_chunks; + int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool); if (type == rpcrdma_readch || type == rpcrdma_areadch) { /* a read chunk - server will RDMA Read our memory */ @@ -198,21 +272,22 @@ pos = target->head[0].iov_len; nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); - if (nsegs == 0) - return 0; + if (nsegs < 0) + return nsegs; + map = r_xprt->rx_ia.ri_ops->ro_map; do { - /* bind/register the memory, then build chunk from result. */ - int n = rpcrdma_register_external(seg, nsegs, - cur_wchunk != NULL, r_xprt); + n = map(r_xprt, seg, nsegs, cur_wchunk != NULL); if (n <= 0) goto out; if (cur_rchunk) { /* read */ cur_rchunk->rc_discrim = xdr_one; /* all read chunks have the same "position" */ - cur_rchunk->rc_position = htonl(pos); - cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey); - cur_rchunk->rc_target.rs_length = htonl(seg->mr_len); + cur_rchunk->rc_position = cpu_to_be32(pos); + cur_rchunk->rc_target.rs_handle = + cpu_to_be32(seg->mr_rkey); + cur_rchunk->rc_target.rs_length = + cpu_to_be32(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_rchunk->rc_target.rs_offset, seg->mr_base); @@ -223,8 +298,10 @@ cur_rchunk++; r_xprt->rx_stats.read_chunk_count++; } else { /* write/reply */ - cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey); - cur_wchunk->wc_target.rs_length = htonl(seg->mr_len); + cur_wchunk->wc_target.rs_handle = + cpu_to_be32(seg->mr_rkey); + cur_wchunk->wc_target.rs_length = + cpu_to_be32(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_wchunk->wc_target.rs_offset, seg->mr_base); @@ -248,10 +325,6 @@ /* success. all failures return above */ req->rl_nchunks = nchunks; - BUG_ON(nchunks == 0); - BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) - && (nchunks > 3)); - /* * finish off header. If write, marshal discrim and nchunks. */ @@ -262,7 +335,7 @@ *iptr++ = xdr_zero; /* encode a NULL reply chunk */ } else { warray->wc_discrim = xdr_one; - warray->wc_nchunks = htonl(nchunks); + warray->wc_nchunks = cpu_to_be32(nchunks); iptr = (__be32 *) cur_wchunk; if (type == rpcrdma_writech) { *iptr++ = xdr_zero; /* finish the write chunk list */ @@ -277,9 +350,9 @@ out: for (pos = 0; nchunks--;) - pos += rpcrdma_deregister_external( - &req->rl_segments[pos], r_xprt, NULL); - return 0; + pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, + &req->rl_segments[pos]); + return n; } /* @@ -289,8 +362,7 @@ * pre-registered memory buffer for this request. For small amounts * of data, this is efficient. The cutoff value is tunable. */ -static int -rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) +static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) { int i, npages, curlen; int copy_len; @@ -302,16 +374,9 @@ destp = rqst->rq_svec[0].iov_base; curlen = rqst->rq_svec[0].iov_len; destp += curlen; - /* - * Do optional padding where it makes sense. Alignment of write - * payload can help the server, if our setting is accurate. - */ - pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/); - if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH) - pad = 0; /* don't pad this request */ - dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n", - __func__, pad, destp, rqst->rq_slen, curlen); + dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n", + __func__, destp, rqst->rq_slen, curlen); copy_len = rqst->rq_snd_buf.page_len; @@ -347,7 +412,6 @@ page_base = 0; } /* header now contains entire send message */ - return pad; } /* @@ -361,6 +425,8 @@ * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. * [2] -- optional padding. * [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno. */ int @@ -370,10 +436,16 @@ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); char *base; - size_t hdrlen, rpclen, padlen; + size_t rpclen; + ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) + return rpcrdma_bc_marshal_reply(rqst); +#endif + /* * rpclen gets amount of data in first buffer, which is the * pre-registered buffer. @@ -381,39 +453,25 @@ base = rqst->rq_svec[0].iov_base; rpclen = rqst->rq_svec[0].iov_len; - /* build RDMA header in private area at front */ - headerp = (struct rpcrdma_msg *) req->rl_base; - /* don't htonl XID, it's already done in request */ + headerp = rdmab_to_msg(req->rl_rdmabuf); + /* don't byte-swap XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; - headerp->rm_vers = xdr_one; - headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests); - headerp->rm_type = htonl(RDMA_MSG); + headerp->rm_vers = rpcrdma_version; + headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); + headerp->rm_type = rdma_msg; /* * Chunks needed for results? * + * o Read ops return data as write chunk(s), header as inline. * o If the expected result is under the inline threshold, all ops - * return as inline (but see later). + * return as inline. * o Large non-read ops return as a single reply chunk. - * o Large read ops return data as write chunk(s), header as inline. - * - * Note: the NFS code sending down multiple result segments implies - * the op is one of read, readdir[plus], readlink or NFSv4 getacl. - */ - - /* - * This code can handle read chunks, write chunks OR reply - * chunks -- only one type. If the request is too big to fit - * inline, then we will choose read chunks. If the request is - * a READ, then use write chunks to separate the file data - * into pages; otherwise use reply chunks. */ - if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst)) - wtype = rpcrdma_noch; - else if (rqst->rq_rcv_buf.page_len == 0) - wtype = rpcrdma_replych; - else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + if (rqst->rq_rcv_buf.flags & XDRBUF_READ) wtype = rpcrdma_writech; + else if (rpcrdma_results_inline(rqst)) + wtype = rpcrdma_noch; else wtype = rpcrdma_replych; @@ -422,37 +480,36 @@ * * o If the total request is under the inline threshold, all ops * are sent as inline. - * o Large non-write ops are sent with the entire message as a - * single read chunk (protocol 0-position special case). * o Large write ops transmit data as read chunk(s), header as * inline. + * o Large non-write ops are sent with the entire message as a + * single read chunk (protocol 0-position special case). * - * Note: the NFS code sending down multiple argument segments - * implies the op is a write. - * TBD check NFSv4 setacl + * This assumes that the upper layer does not present a request + * that both has a data payload, and whose non-data arguments + * by themselves are larger than the inline threshold. */ - if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) + if (rpcrdma_args_inline(rqst)) { rtype = rpcrdma_noch; - else if (rqst->rq_snd_buf.page_len == 0) - rtype = rpcrdma_areadch; - else + } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; + } else { + r_xprt->rx_stats.nomsg_call_count++; + headerp->rm_type = htonl(RDMA_NOMSG); + rtype = rpcrdma_areadch; + rpclen = 0; + } /* The following simplification is not true forever */ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) wtype = rpcrdma_noch; - BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); - - if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && - (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { - /* forced to "pure inline"? */ - dprintk("RPC: %s: too much data (%d/%d) for inline\n", - __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); - return -1; + if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { + dprintk("RPC: %s: cannot marshal multiple chunk lists\n", + __func__); + return -EIO; } - hdrlen = 28; /*sizeof *headerp;*/ - padlen = 0; + hdrlen = RPCRDMA_HDRLEN_MIN; /* * Pull up any extra send data into the preregistered buffer. @@ -461,64 +518,31 @@ */ if (rtype == rpcrdma_noch) { - padlen = rpcrdma_inline_pullup(rqst, - RPCRDMA_INLINE_PAD_VALUE(rqst)); - - if (padlen) { - headerp->rm_type = htonl(RDMA_MSGP); - headerp->rm_body.rm_padded.rm_align = - htonl(RPCRDMA_INLINE_PAD_VALUE(rqst)); - headerp->rm_body.rm_padded.rm_thresh = - htonl(RPCRDMA_INLINE_PAD_THRESH); - headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero; - headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; - headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; - hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ - BUG_ON(wtype != rpcrdma_noch); + rpcrdma_inline_pullup(rqst); - } else { - headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; - /* new length after pullup */ - rpclen = rqst->rq_svec[0].iov_len; - /* - * Currently we try to not actually use read inline. - * Reply chunks have the desirable property that - * they land, packed, directly in the target buffers - * without headers, so they require no fixup. The - * additional RDMA Write op sends the same amount - * of data, streams on-the-wire and adds no overhead - * on receive. Therefore, we request a reply chunk - * for non-writes wherever feasible and efficient. - */ - if (wtype == rpcrdma_noch && - r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) - wtype = rpcrdma_replych; - } - } - - /* - * Marshal chunks. This routine will return the header length - * consumed by marshaling. - */ + headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; + headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; + headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; + /* new length after pullup */ + rpclen = rqst->rq_svec[0].iov_len; + } else if (rtype == rpcrdma_readch) + rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); if (rtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, - &rqst->rq_snd_buf, headerp, rtype); + hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, + headerp, rtype); wtype = rtype; /* simplify dprintk */ } else if (wtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, - &rqst->rq_rcv_buf, headerp, wtype); + hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, + headerp, wtype); } + if (hdrlen < 0) + return hdrlen; - if (hdrlen == 0) - return -1; - - dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" + dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" " headerp 0x%p base 0x%p lkey 0x%x\n", - __func__, transfertypes[wtype], hdrlen, rpclen, padlen, - headerp, base, req->rl_iov.lkey); + __func__, transfertypes[wtype], hdrlen, rpclen, + headerp, base, rdmab_lkey(req->rl_rdmabuf)); /* * initialize send_iov's - normally only two: rdma chunk header and @@ -527,30 +551,19 @@ * header and any write data. In all non-rdma cases, any following * data has been copied into the RPC header buffer. */ - req->rl_send_iov[0].addr = req->rl_iov.addr; + req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = hdrlen; - req->rl_send_iov[0].lkey = req->rl_iov.lkey; + req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); - req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base); + req->rl_niovs = 1; + if (rtype == rpcrdma_areadch) + return 0; + + req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); req->rl_send_iov[1].length = rpclen; - req->rl_send_iov[1].lkey = req->rl_iov.lkey; + req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); req->rl_niovs = 2; - - if (padlen) { - struct rpcrdma_ep *ep = &r_xprt->rx_ep; - - req->rl_send_iov[2].addr = ep->rep_pad.addr; - req->rl_send_iov[2].length = padlen; - req->rl_send_iov[2].lkey = ep->rep_pad.lkey; - - req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; - req->rl_send_iov[3].length = rqst->rq_slen - rpclen; - req->rl_send_iov[3].lkey = req->rl_iov.lkey; - - req->rl_niovs = 4; - } - return 0; } @@ -563,8 +576,9 @@ { unsigned int i, total_len; struct rpcrdma_write_chunk *cur_wchunk; + char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); - i = ntohl(**iptrp); /* get array count */ + i = be32_to_cpu(**iptrp); if (i > max) return -1; cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); @@ -576,11 +590,11 @@ xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n", __func__, - ntohl(seg->rs_length), + be32_to_cpu(seg->rs_length), (unsigned long long)off, - ntohl(seg->rs_handle)); + be32_to_cpu(seg->rs_handle)); } - total_len += ntohl(seg->rs_length); + total_len += be32_to_cpu(seg->rs_length); ++cur_wchunk; } /* check and adjust for properly terminated write chunk */ @@ -590,7 +604,7 @@ return -1; cur_wchunk = (struct rpcrdma_write_chunk *) w; } - if ((char *) cur_wchunk > rep->rr_base + rep->rr_len) + if ((char *)cur_wchunk > base + rep->rr_len) return -1; *iptrp = (__be32 *) cur_wchunk; @@ -649,9 +663,7 @@ break; page_base = 0; } - rqst->rq_rcv_buf.page_len = olen - copy_len; - } else - rqst->rq_rcv_buf.page_len = 0; + } if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { curlen = copy_len; @@ -682,16 +694,14 @@ rqst->rq_private_buf = rqst->rq_rcv_buf; } -/* - * This function is called when an async event is posted to - * the connection which changes the connection state. All it - * does at this point is mark the connection up/down, the rpc - * timers do the rest. - */ void -rpcrdma_conn_func(struct rpcrdma_ep *ep) +rpcrdma_connect_worker(struct work_struct *work) { - struct rpc_xprt *xprt = ep->rep_xprt; + struct rpcrdma_ep *ep = + container_of(work, struct rpcrdma_ep, rep_connect_worker.work); + struct rpcrdma_xprt *r_xprt = + container_of(ep, struct rpcrdma_xprt, rx_ep); + struct rpc_xprt *xprt = &r_xprt->rx_xprt; spin_lock_bh(&xprt->transport_lock); if (++xprt->connect_cookie == 0) /* maintain a reserved value */ @@ -706,18 +716,51 @@ spin_unlock_bh(&xprt->transport_lock); } -/* - * This function is called when memory window unbind which we are waiting - * for completes. Just use rr_func (zeroed by upcall) to signal completion. +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +/* By convention, backchannel calls arrive via rdma_msg type + * messages, and never populate the chunk lists. This makes + * the RPC/RDMA header small and fixed in size, so it is + * straightforward to check the RPC header's direction field. */ -static void -rpcrdma_unbind_func(struct rpcrdma_rep *rep) +static bool +rpcrdma_is_bcall(struct rpcrdma_msg *headerp) { - wake_up(&rep->rr_unbind); + __be32 *p = (__be32 *)headerp; + + if (headerp->rm_type != rdma_msg) + return false; + if (headerp->rm_body.rm_chunks[0] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[1] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[2] != xdr_zero) + return false; + + /* sanity */ + if (p[7] != headerp->rm_xid) + return false; + /* call direction */ + if (p[8] != cpu_to_be32(RPC_CALL)) + return false; + + return true; } +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ /* - * Called as a tasklet to do req/reply match and complete a request + * This function is called when an async event is posted to + * the connection which changes the connection state. All it + * does at this point is mark the connection up/down, the rpc + * timers do the rest. + */ +void +rpcrdma_conn_func(struct rpcrdma_ep *ep) +{ + schedule_delayed_work(&ep->rep_connect_worker, 0); +} + +/* Process received RPC/RDMA messages. + * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. */ @@ -727,69 +770,53 @@ struct rpcrdma_msg *headerp; struct rpcrdma_req *req; struct rpc_rqst *rqst; - struct rpc_xprt *xprt = rep->rr_xprt; - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + struct rpc_xprt *xprt = &r_xprt->rx_xprt; __be32 *iptr; - int i, rdmalen, status; - - /* Check status. If bad, signal disconnect and return rep to pool */ - if (rep->rr_len == ~0U) { - rpcrdma_recv_buffer_put(rep); - if (r_xprt->rx_ep.rep_connected == 1) { - r_xprt->rx_ep.rep_connected = -EIO; - rpcrdma_conn_func(&r_xprt->rx_ep); - } - return; - } - if (rep->rr_len < 28) { - dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; - } - headerp = (struct rpcrdma_msg *) rep->rr_base; - if (headerp->rm_vers != xdr_one) { - dprintk("RPC: %s: invalid version %d\n", - __func__, ntohl(headerp->rm_vers)); - goto repost; - } + int rdmalen, status; + unsigned long cwnd; + u32 credits; + + dprintk("RPC: %s: incoming rep %p\n", __func__, rep); + + if (rep->rr_len == RPCRDMA_BAD_LEN) + goto out_badstatus; + if (rep->rr_len < RPCRDMA_HDRLEN_MIN) + goto out_shortreply; + + headerp = rdmab_to_msg(rep->rr_rdmabuf); + if (headerp->rm_vers != rpcrdma_version) + goto out_badversion; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (rpcrdma_is_bcall(headerp)) + goto out_bcall; +#endif - /* Get XID and try for a match. */ - spin_lock(&xprt->transport_lock); + /* Match incoming rpcrdma_rep to an rpcrdma_req to + * get context for handling any incoming chunks. + */ + spin_lock_bh(&xprt->transport_lock); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); - if (rqst == NULL) { - spin_unlock(&xprt->transport_lock); - dprintk("RPC: %s: reply 0x%p failed " - "to match any request xid 0x%08x len %d\n", - __func__, rep, headerp->rm_xid, rep->rr_len); -repost: - r_xprt->rx_stats.bad_reply_count++; - rep->rr_func = rpcrdma_reply_handler; - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) - rpcrdma_recv_buffer_put(rep); - - return; - } + if (!rqst) + goto out_nomatch; - /* get request object */ req = rpcr_to_rdmar(rqst); - if (req->rl_reply) { - spin_unlock(&xprt->transport_lock); - dprintk("RPC: %s: duplicate reply 0x%p to RPC " - "request 0x%p: xid 0x%08x\n", __func__, rep, req, - headerp->rm_xid); - goto repost; - } + if (req->rl_reply) + goto out_duplicate; dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", - __func__, rep, req, rqst, headerp->rm_xid); + __func__, rep, req, rqst, + be32_to_cpu(headerp->rm_xid)); /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; + xprt->reestablish_timeout = 0; /* check for expected message types */ /* The order of some of these tests is important. */ switch (headerp->rm_type) { - case htonl(RDMA_MSG): + case rdma_msg: /* never expect read chunks */ /* never expect reply chunks (two ways to check) */ /* never expect write chunks without having offered RDMA */ @@ -820,22 +847,24 @@ } else { /* else ordinary inline */ rdmalen = 0; - iptr = (__be32 *)((unsigned char *)headerp + 28); - rep->rr_len -= 28; /*sizeof *headerp;*/ + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); + rep->rr_len -= RPCRDMA_HDRLEN_MIN; status = rep->rr_len; } /* Fix up the rpc results for upper layer */ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); break; - case htonl(RDMA_NOMSG): + case rdma_nomsg: /* never expect read or write chunks, always reply chunks */ if (headerp->rm_body.rm_chunks[0] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero || headerp->rm_body.rm_chunks[2] != xdr_one || req->rl_nchunks == 0) goto badheader; - iptr = (__be32 *)((unsigned char *)headerp + 28); + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); if (rdmalen < 0) goto badheader; @@ -849,7 +878,7 @@ dprintk("%s: invalid rpcrdma reply header (type %d):" " chunks[012] == %d %d %d" " expected chunks <= %d\n", - __func__, ntohl(headerp->rm_type), + __func__, be32_to_cpu(headerp->rm_type), headerp->rm_body.rm_chunks[0], headerp->rm_body.rm_chunks[1], headerp->rm_body.rm_chunks[2], @@ -859,29 +888,61 @@ break; } - /* If using mw bind, start the deregister process now. */ - /* (Note: if mr_free(), cannot perform it here, in tasklet context) */ - if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { - case RPCRDMA_MEMWINDOWS: - for (i = 0; req->rl_nchunks-- > 1;) - i += rpcrdma_deregister_external( - &req->rl_segments[i], r_xprt, NULL); - /* Optionally wait (not here) for unbinds to complete */ - rep->rr_func = rpcrdma_unbind_func; - (void) rpcrdma_deregister_external(&req->rl_segments[i], - r_xprt, rep); - break; - case RPCRDMA_MEMWINDOWS_ASYNC: - for (i = 0; req->rl_nchunks--;) - i += rpcrdma_deregister_external(&req->rl_segments[i], - r_xprt, NULL); - break; - default: - break; - } + credits = be32_to_cpu(headerp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > r_xprt->rx_buf.rb_max_requests) + credits = r_xprt->rx_buf.rb_max_requests; + + cwnd = xprt->cwnd; + xprt->cwnd = credits << RPC_CWNDSHIFT; + if (xprt->cwnd > cwnd) + xprt_release_rqst_cong(rqst->rq_task); + xprt_complete_rqst(rqst->rq_task, status); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); - xprt_complete_rqst(rqst->rq_task, status); - spin_unlock(&xprt->transport_lock); + return; + +out_badstatus: + rpcrdma_recv_buffer_put(rep); + if (r_xprt->rx_ep.rep_connected == 1) { + r_xprt->rx_ep.rep_connected = -EIO; + rpcrdma_conn_func(&r_xprt->rx_ep); + } + return; + +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +out_bcall: + rpcrdma_bc_receive_call(r_xprt, rep); + return; +#endif + +out_shortreply: + dprintk("RPC: %s: short/invalid reply\n", __func__); + goto repost; + +out_badversion: + dprintk("RPC: %s: invalid version %d\n", + __func__, be32_to_cpu(headerp->rm_vers)); + goto repost; + +out_nomatch: + spin_unlock_bh(&xprt->transport_lock); + dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", + __func__, be32_to_cpu(headerp->rm_xid), + rep->rr_len); + goto repost; + +out_duplicate: + spin_unlock_bh(&xprt->transport_lock); + dprintk("RPC: %s: " + "duplicate reply %p to RPC request %p: xid 0x%08x\n", + __func__, rep, req, be32_to_cpu(headerp->rm_xid)); + +repost: + r_xprt->rx_stats.bad_reply_count++; + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + rpcrdma_recv_buffer_put(rep); }