--- zzzz-none-000/linux-3.10.107/net/sunrpc/xprtrdma/transport.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/net/sunrpc/xprtrdma/transport.c 2021-02-04 17:41:59.000000000 +0000 @@ -48,22 +48,16 @@ */ #include -#include #include #include #include #include "xprt_rdma.h" -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS #endif -MODULE_LICENSE("Dual BSD/GPL"); - -MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS"); -MODULE_AUTHOR("Network Appliance, Inc."); - /* * tunables */ @@ -73,9 +67,9 @@ static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE; static unsigned int xprt_rdma_inline_write_padding; static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; - int xprt_rdma_pad_optimize = 0; + int xprt_rdma_pad_optimize = 1; -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; @@ -86,7 +80,7 @@ static struct ctl_table_header *sunrpc_table_header; -static ctl_table xr_tunables_table[] = { +static struct ctl_table xr_tunables_table[] = { { .procname = "rdma_slot_table_entries", .data = &xprt_rdma_slot_table_entries, @@ -138,7 +132,7 @@ { }, }; -static ctl_table sunrpc_table[] = { +static struct ctl_table sunrpc_table[] = { { .procname = "sunrpc", .mode = 0555, @@ -149,15 +143,53 @@ #endif +#define RPCRDMA_BIND_TO (60U * HZ) +#define RPCRDMA_INIT_REEST_TO (5U * HZ) +#define RPCRDMA_MAX_REEST_TO (30U * HZ) +#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ) + static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */ static void -xprt_rdma_format_addresses(struct rpc_xprt *xprt) +xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap) { - struct sockaddr *sap = (struct sockaddr *) - &rpcx_to_rdmad(xprt).addr; struct sockaddr_in *sin = (struct sockaddr_in *)sap; - char buf[64]; + char buf[20]; + + snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); + xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); + + xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA; +} + +static void +xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap) +{ + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap; + char buf[40]; + + snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); + xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); + + xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6; +} + +static void +xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap) +{ + char buf[128]; + + switch (sap->sa_family) { + case AF_INET: + xprt_rdma_format_addresses4(xprt, sap); + break; + case AF_INET6: + xprt_rdma_format_addresses6(xprt, sap); + break; + default: + pr_err("rpcrdma: Unrecognized address family\n"); + return; + } (void)rpc_ntop(sap, buf, sizeof(buf)); xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); @@ -165,16 +197,10 @@ snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); - xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma"; - - snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); - xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); - snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); - /* netid */ - xprt->address_strings[RPC_DISPLAY_NETID] = "rdma"; + xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma"; } static void @@ -195,12 +221,11 @@ static void xprt_rdma_connect_worker(struct work_struct *work) { - struct rpcrdma_xprt *r_xprt = - container_of(work, struct rpcrdma_xprt, rdma_connect.work); - struct rpc_xprt *xprt = &r_xprt->xprt; + struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt, + rx_connect_worker.work); + struct rpc_xprt *xprt = &r_xprt->rx_xprt; int rc = 0; - current->flags |= PF_FSTRANS; xprt_clear_connected(xprt); dprintk("RPC: %s: %sconnect\n", __func__, @@ -211,7 +236,16 @@ dprintk("RPC: %s: exit\n", __func__); xprt_clear_connecting(xprt); - current->flags &= ~PF_FSTRANS; +} + +static void +xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) +{ + struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt, + rx_xprt); + + pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt); + rdma_disconnect(r_xprt->rx_ia.ri_id); } /* @@ -229,19 +263,15 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - int rc; dprintk("RPC: %s: called\n", __func__); - cancel_delayed_work_sync(&r_xprt->rdma_connect); + cancel_delayed_work_sync(&r_xprt->rx_connect_worker); xprt_clear_connected(xprt); + rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); rpcrdma_buffer_destroy(&r_xprt->rx_buf); - rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); - if (rc) - dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n", - __func__, rc); rpcrdma_ia_close(&r_xprt->rx_ia); xprt_rdma_free_addresses(xprt); @@ -270,7 +300,7 @@ struct rpc_xprt *xprt; struct rpcrdma_xprt *new_xprt; struct rpcrdma_ep *new_ep; - struct sockaddr_in *sin; + struct sockaddr *sap; int rc; if (args->addrlen > sizeof(xprt->addr)) { @@ -289,39 +319,32 @@ /* 60 second timeout, no retries */ xprt->timeout = &xprt_rdma_default_timeout; - xprt->bind_timeout = (60U * HZ); - xprt->reestablish_timeout = (5U * HZ); - xprt->idle_timeout = (5U * 60 * HZ); + xprt->bind_timeout = RPCRDMA_BIND_TO; + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; + xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; xprt->resvport = 0; /* privileged port not needed */ xprt->tsh_size = 0; /* RPC-RDMA handles framing */ - xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE; xprt->ops = &xprt_rdma_procs; /* * Set up RDMA-specific connect data. */ - /* Put server RDMA address in local cdata */ - memcpy(&cdata.addr, args->dstaddr, args->addrlen); + sap = (struct sockaddr *)&cdata.addr; + memcpy(sap, args->dstaddr, args->addrlen); /* Ensure xprt->addr holds valid server TCP (not RDMA) * address, for any side protocols which peek at it */ xprt->prot = IPPROTO_TCP; xprt->addrlen = args->addrlen; - memcpy(&xprt->addr, &cdata.addr, xprt->addrlen); + memcpy(&xprt->addr, sap, xprt->addrlen); - sin = (struct sockaddr_in *)&cdata.addr; - if (ntohs(sin->sin_port) != 0) + if (rpc_get_port(sap)) xprt_set_bound(xprt); - dprintk("RPC: %s: %pI4:%u\n", - __func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port)); - - /* Set max requests */ cdata.max_requests = xprt->max_reqs; - /* Set some length limits */ cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */ cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */ @@ -344,8 +367,7 @@ new_xprt = rpcx_to_rdmax(xprt); - rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr, - xprt_rdma_memreg_strategy); + rc = rpcrdma_ia_open(new_xprt, sap, xprt_rdma_memreg_strategy); if (rc) goto out1; @@ -366,8 +388,7 @@ * any inline data. Also specify any padding which will be provided * from a preregistered zero buffer. */ - rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia, - &new_xprt->rx_data); + rc = rpcrdma_buffer_create(new_xprt); if (rc) goto out3; @@ -376,22 +397,30 @@ * connection loss notification is async. We also catch connection loss * when reaping receives. */ - INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker); - new_ep->rep_func = rpcrdma_conn_func; - new_ep->rep_xprt = xprt; + INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, + xprt_rdma_connect_worker); - xprt_rdma_format_addresses(xprt); + xprt_rdma_format_addresses(xprt, sap); + xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt); + if (xprt->max_payload == 0) + goto out4; + xprt->max_payload <<= PAGE_SHIFT; + dprintk("RPC: %s: transport data payload maximum: %zu bytes\n", + __func__, xprt->max_payload); if (!try_module_get(THIS_MODULE)) goto out4; + dprintk("RPC: %s: %s:%s\n", __func__, + xprt->address_strings[RPC_DISPLAY_ADDR], + xprt->address_strings[RPC_DISPLAY_PORT]); return xprt; out4: xprt_rdma_free_addresses(xprt); rc = -EINVAL; out3: - (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); + rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); out2: rpcrdma_ia_close(&new_xprt->rx_ia); out1: @@ -411,7 +440,7 @@ if (r_xprt->rx_ep.rep_connected > 0) xprt->reestablish_timeout = 0; xprt_disconnect_done(xprt); - (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia); + rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia); } static void @@ -433,123 +462,101 @@ if (r_xprt->rx_ep.rep_connected != 0) { /* Reconnect */ - schedule_delayed_work(&r_xprt->rdma_connect, - xprt->reestablish_timeout); + schedule_delayed_work(&r_xprt->rx_connect_worker, + xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > (30 * HZ)) - xprt->reestablish_timeout = (30 * HZ); - else if (xprt->reestablish_timeout < (5 * HZ)) - xprt->reestablish_timeout = (5 * HZ); + if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; + else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; } else { - schedule_delayed_work(&r_xprt->rdma_connect, 0); + schedule_delayed_work(&r_xprt->rx_connect_worker, 0); if (!RPC_IS_ASYNC(task)) - flush_delayed_work(&r_xprt->rdma_connect); - } -} - -static int -xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) -{ - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - int credits = atomic_read(&r_xprt->rx_buf.rb_credits); - - /* == RPC_CWNDSCALE @ init, but *after* setup */ - if (r_xprt->rx_buf.rb_cwndscale == 0UL) { - r_xprt->rx_buf.rb_cwndscale = xprt->cwnd; - dprintk("RPC: %s: cwndscale %lu\n", __func__, - r_xprt->rx_buf.rb_cwndscale); - BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); + flush_delayed_work(&r_xprt->rx_connect_worker); } - xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; - return xprt_reserve_xprt_cong(xprt, task); } /* * The RDMA allocate/free functions need the task structure as a place * to hide the struct rpcrdma_req, which is necessary for the actual send/recv - * sequence. For this reason, the recv buffers are attached to send - * buffers for portions of the RPC. Note that the RPC layer allocates - * both send and receive buffers in the same call. We may register - * the receive buffer portion when using reply chunks. + * sequence. + * + * The RPC layer allocates both send and receive buffers in the same call + * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer). + * We may register rq_rcv_buf when using reply chunks. */ static void * xprt_rdma_allocate(struct rpc_task *task, size_t size) { struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - struct rpcrdma_req *req, *nreq; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_regbuf *rb; + struct rpcrdma_req *req; + size_t min_size; + gfp_t flags; - req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); - BUG_ON(NULL == req); + req = rpcrdma_buffer_get(&r_xprt->rx_buf); + if (req == NULL) + return NULL; + + flags = GFP_NOIO | __GFP_NOWARN; + if (RPC_IS_SWAPPER(task)) + flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; + + if (req->rl_rdmabuf == NULL) + goto out_rdmabuf; + if (req->rl_sendbuf == NULL) + goto out_sendbuf; + if (size > req->rl_sendbuf->rg_size) + goto out_sendbuf; - if (size > req->rl_size) { - dprintk("RPC: %s: size %zd too large for buffer[%zd]: " - "prog %d vers %d proc %d\n", - __func__, size, req->rl_size, - task->tk_client->cl_prog, task->tk_client->cl_vers, - task->tk_msg.rpc_proc->p_proc); - /* - * Outgoing length shortage. Our inline write max must have - * been configured to perform direct i/o. - * - * This is therefore a large metadata operation, and the - * allocate call was made on the maximum possible message, - * e.g. containing long filename(s) or symlink data. In - * fact, while these metadata operations *might* carry - * large outgoing payloads, they rarely *do*. However, we - * have to commit to the request here, so reallocate and - * register it now. The data path will never require this - * reallocation. - * - * If the allocation or registration fails, the RPC framework - * will (doggedly) retry. - */ - if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy == - RPCRDMA_BOUNCEBUFFERS) { - /* forced to "pure inline" */ - dprintk("RPC: %s: too much data (%zd) for inline " - "(r/w max %d/%d)\n", __func__, size, - rpcx_to_rdmad(xprt).inline_rsize, - rpcx_to_rdmad(xprt).inline_wsize); - size = req->rl_size; - rpc_exit(task, -EIO); /* fail the operation */ - rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; - goto out; - } - if (task->tk_flags & RPC_TASK_SWAPPER) - nreq = kmalloc(sizeof *req + size, GFP_ATOMIC); - else - nreq = kmalloc(sizeof *req + size, GFP_NOFS); - if (nreq == NULL) - goto outfail; - - if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia, - nreq->rl_base, size + sizeof(struct rpcrdma_req) - - offsetof(struct rpcrdma_req, rl_base), - &nreq->rl_handle, &nreq->rl_iov)) { - kfree(nreq); - goto outfail; - } - rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size; - nreq->rl_size = size; - nreq->rl_niovs = 0; - nreq->rl_nchunks = 0; - nreq->rl_buffer = (struct rpcrdma_buffer *)req; - nreq->rl_reply = req->rl_reply; - memcpy(nreq->rl_segments, - req->rl_segments, sizeof nreq->rl_segments); - /* flag the swap with an unused field */ - nreq->rl_iov.length = 0; - req->rl_reply = NULL; - req = nreq; - } - dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); out: + dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); req->rl_connect_cookie = 0; /* our reserved value */ - return req->rl_xdr_buf; + return req->rl_sendbuf->rg_base; -outfail: +out_rdmabuf: + min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); + rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags); + if (IS_ERR(rb)) + goto out_fail; + req->rl_rdmabuf = rb; + +out_sendbuf: + /* XDR encoding and RPC/RDMA marshaling of this request has not + * yet occurred. Thus a lower bound is needed to prevent buffer + * overrun during marshaling. + * + * RPC/RDMA marshaling may choose to send payload bearing ops + * inline, if the result is smaller than the inline threshold. + * The value of the "size" argument accounts for header + * requirements but not for the payload in these cases. + * + * Likewise, allocate enough space to receive a reply up to the + * size of the inline threshold. + * + * It's unlikely that both the send header and the received + * reply will be large, but slush is provided here to allow + * flexibility when marshaling. + */ + min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp); + min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); + if (size < min_size) + size = min_size; + + rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags); + if (IS_ERR(rb)) + goto out_fail; + rb->rg_owner = req; + + r_xprt->rx_stats.hardway_register_count += size; + rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf); + req->rl_sendbuf = rb; + goto out; + +out_fail: rpcrdma_buffer_put(req); - rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; + r_xprt->rx_stats.failed_marshal_count++; return NULL; } @@ -561,54 +568,24 @@ { struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; - struct rpcrdma_rep *rep; + struct rpcrdma_regbuf *rb; int i; if (buffer == NULL) return; - req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]); - if (req->rl_iov.length == 0) { /* see allocate above */ - r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer, - struct rpcrdma_xprt, rx_buf); - } else - r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - rep = req->rl_reply; + rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]); + req = rb->rg_owner; + r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - dprintk("RPC: %s: called on 0x%p%s\n", - __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : ""); + dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - /* - * Finish the deregistration. When using mw bind, this was - * begun in rpcrdma_reply_handler(). In all other modes, we - * do it here, in thread context. The process is considered - * complete when the rr_func vector becomes NULL - this - * was put in place during rpcrdma_reply_handler() - the wait - * call below will not block if the dereg is "done". If - * interrupted, our framework will clean up. - */ for (i = 0; req->rl_nchunks;) { --req->rl_nchunks; - i += rpcrdma_deregister_external( - &req->rl_segments[i], r_xprt, NULL); - } - - if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) { - rep->rr_func = NULL; /* abandon the callback */ - req->rl_reply = NULL; + i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, + &req->rl_segments[i]); } - if (req->rl_iov.length == 0) { /* see allocate above */ - struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer; - oreq->rl_reply = req->rl_reply; - (void) rpcrdma_deregister_internal(&r_xprt->rx_ia, - req->rl_handle, - &req->rl_iov); - kfree(req); - req = oreq; - } - - /* Put back request+reply buffers */ rpcrdma_buffer_put(req); } @@ -630,24 +607,15 @@ struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + int rc = 0; - /* marshal the send itself */ - if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) { - r_xprt->rx_stats.failed_marshal_count++; - dprintk("RPC: %s: rpcrdma_marshal_req failed\n", - __func__); - return -EIO; - } + rc = rpcrdma_marshal_req(rqst); + if (rc < 0) + goto failed_marshal; if (req->rl_reply == NULL) /* e.g. reconnection */ rpcrdma_recv_buffer_get(req); - if (req->rl_reply) { - req->rl_reply->rr_func = rpcrdma_reply_handler; - /* this need only be done once, but... */ - req->rl_reply->rr_xprt = xprt; - } - /* Must suppress retransmit to maintain credits */ if (req->rl_connect_cookie == xprt->connect_cookie) goto drop_connection; @@ -660,6 +628,12 @@ rqst->rq_bytes_sent = 0; return 0; +failed_marshal: + r_xprt->rx_stats.failed_marshal_count++; + dprintk("RPC: %s: rpcrdma_marshal_req failed, status %i\n", + __func__, rc); + if (rc == -EIO) + return -EIO; drop_connection: xprt_disconnect_done(xprt); return -ENOTCONN; /* implies disconnect */ @@ -673,31 +647,41 @@ if (xprt_connected(xprt)) idle_time = (long)(jiffies - xprt->last_used) / HZ; - seq_printf(seq, - "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu " - "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n", - - 0, /* need a local port? */ - xprt->stat.bind_count, - xprt->stat.connect_count, - xprt->stat.connect_time, - idle_time, - xprt->stat.sends, - xprt->stat.recvs, - xprt->stat.bad_xids, - xprt->stat.req_u, - xprt->stat.bklog_u, - - r_xprt->rx_stats.read_chunk_count, - r_xprt->rx_stats.write_chunk_count, - r_xprt->rx_stats.reply_chunk_count, - r_xprt->rx_stats.total_rdma_request, - r_xprt->rx_stats.total_rdma_reply, - r_xprt->rx_stats.pullup_copy_count, - r_xprt->rx_stats.fixup_copy_count, - r_xprt->rx_stats.hardway_register_count, - r_xprt->rx_stats.failed_marshal_count, - r_xprt->rx_stats.bad_reply_count); + seq_puts(seq, "\txprt:\trdma "); + seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ", + 0, /* need a local port? */ + xprt->stat.bind_count, + xprt->stat.connect_count, + xprt->stat.connect_time, + idle_time, + xprt->stat.sends, + xprt->stat.recvs, + xprt->stat.bad_xids, + xprt->stat.req_u, + xprt->stat.bklog_u); + seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n", + r_xprt->rx_stats.read_chunk_count, + r_xprt->rx_stats.write_chunk_count, + r_xprt->rx_stats.reply_chunk_count, + r_xprt->rx_stats.total_rdma_request, + r_xprt->rx_stats.total_rdma_reply, + r_xprt->rx_stats.pullup_copy_count, + r_xprt->rx_stats.fixup_copy_count, + r_xprt->rx_stats.hardway_register_count, + r_xprt->rx_stats.failed_marshal_count, + r_xprt->rx_stats.bad_reply_count, + r_xprt->rx_stats.nomsg_call_count); +} + +static int +xprt_rdma_enable_swap(struct rpc_xprt *xprt) +{ + return 0; +} + +static void +xprt_rdma_disable_swap(struct rpc_xprt *xprt) +{ } /* @@ -705,7 +689,7 @@ */ static struct rpc_xprt_ops xprt_rdma_procs = { - .reserve_xprt = xprt_rdma_reserve_xprt, + .reserve_xprt = xprt_reserve_xprt_cong, .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ .alloc_slot = xprt_alloc_slot, .release_request = xprt_release_rqst_cong, /* ditto */ @@ -718,7 +702,16 @@ .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, - .print_stats = xprt_rdma_print_stats + .print_stats = xprt_rdma_print_stats, + .enable_swap = xprt_rdma_enable_swap, + .disable_swap = xprt_rdma_disable_swap, + .inject_disconnect = xprt_rdma_inject_disconnect, +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + .bc_setup = xprt_rdma_bc_setup, + .bc_up = xprt_rdma_bc_up, + .bc_free_rqst = xprt_rdma_bc_free_rqst, + .bc_destroy = xprt_rdma_bc_destroy, +#endif }; static struct xprt_class xprt_rdma = { @@ -729,12 +722,12 @@ .setup = xprt_setup_rdma, }; -static void __exit xprt_rdma_cleanup(void) +void xprt_rdma_cleanup(void) { int rc; - dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n"); -#ifdef RPC_DEBUG + dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n"); +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (sunrpc_table_header) { unregister_sysctl_table(sunrpc_table_header); sunrpc_table_header = NULL; @@ -744,33 +737,45 @@ if (rc) dprintk("RPC: %s: xprt_unregister returned %i\n", __func__, rc); + + rpcrdma_destroy_wq(); + frwr_destroy_recovery_wq(); } -static int __init xprt_rdma_init(void) +int xprt_rdma_init(void) { int rc; - rc = xprt_register_transport(&xprt_rdma); - + rc = frwr_alloc_recovery_wq(); if (rc) return rc; - dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n"); + rc = rpcrdma_alloc_wq(); + if (rc) { + frwr_destroy_recovery_wq(); + return rc; + } - dprintk(KERN_INFO "Defaults:\n"); - dprintk(KERN_INFO "\tSlots %d\n" + rc = xprt_register_transport(&xprt_rdma); + if (rc) { + rpcrdma_destroy_wq(); + frwr_destroy_recovery_wq(); + return rc; + } + + dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); + + dprintk("Defaults:\n"); + dprintk("\tSlots %d\n" "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n", xprt_rdma_slot_table_entries, xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); - dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n", + dprintk("\tPadding %d\n\tMemreg %d\n", xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy); -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (!sunrpc_table_header) sunrpc_table_header = register_sysctl_table(sunrpc_table); #endif return 0; } - -module_init(xprt_rdma_init); -module_exit(xprt_rdma_cleanup);