--- zzzz-none-000/linux-3.10.107/net/sunrpc/xprtsock.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/net/sunrpc/xprtsock.c 2021-02-04 17:41:59.000000000 +0000 @@ -47,6 +47,8 @@ #include #include +#include + #include "sunrpc.h" static void xs_close(struct rpc_xprt *xprt); @@ -61,6 +63,8 @@ static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) + #define XS_TCP_LINGER_TO (15U * HZ) static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; @@ -73,8 +77,6 @@ * someone else's file names! */ -#ifdef RPC_DEBUG - static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT; @@ -87,7 +89,7 @@ * FIXME: changing the UDP slot table size should also resize the UDP * socket buffers for existing UDP transports */ -static ctl_table xs_tunables_table[] = { +static struct ctl_table xs_tunables_table[] = { { .procname = "udp_slot_table_entries", .data = &xprt_udp_slot_table_entries, @@ -143,7 +145,7 @@ { }, }; -static ctl_table sunrpc_table[] = { +static struct ctl_table sunrpc_table[] = { { .procname = "sunrpc", .mode = 0555, @@ -184,7 +186,7 @@ */ #define XS_IDLE_DISC_TO (5U * 60 * HZ) -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_TRANS #endif @@ -214,63 +216,10 @@ } #endif -struct sock_xprt { - struct rpc_xprt xprt; - - /* - * Network layer - */ - struct socket * sock; - struct sock * inet; - - /* - * State of TCP reply receive - */ - __be32 tcp_fraghdr, - tcp_xid, - tcp_calldir; - - u32 tcp_offset, - tcp_reclen; - - unsigned long tcp_copied, - tcp_flags; - - /* - * Connection of transports - */ - struct delayed_work connect_worker; - struct sockaddr_storage srcaddr; - unsigned short srcport; - - /* - * UDP socket buffer size parameters - */ - size_t rcvsize, - sndsize; - - /* - * Saved socket callback addresses - */ - void (*old_data_ready)(struct sock *, int); - void (*old_state_change)(struct sock *); - void (*old_write_space)(struct sock *); -}; - -/* - * TCP receive state flags - */ -#define TCP_RCV_LAST_FRAG (1UL << 0) -#define TCP_RCV_COPY_FRAGHDR (1UL << 1) -#define TCP_RCV_COPY_XID (1UL << 2) -#define TCP_RCV_COPY_DATA (1UL << 3) -#define TCP_RCV_READ_CALLDIR (1UL << 4) -#define TCP_RCV_COPY_CALLDIR (1UL << 5) - -/* - * TCP RPC flags - */ -#define TCP_RPC_REPLY (1UL << 6) +static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) +{ + return (struct rpc_xprt *) sk->sk_user_data; +} static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) { @@ -391,13 +340,13 @@ return kernel_sendmsg(sock, &msg, NULL, 0, 0); } -static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy) +static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p) { ssize_t (*do_sendpage)(struct socket *sock, struct page *page, int offset, size_t size, int flags); struct page **ppage; unsigned int remainder; - int err, sent = 0; + int err; remainder = xdr->page_len - base; base += xdr->page_base; @@ -411,20 +360,22 @@ int flags = XS_SENDMSG_FLAGS; remainder -= len; - if (remainder != 0 || more) + if (more) flags |= MSG_MORE; + if (remainder != 0) + flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; err = do_sendpage(sock, *ppage, base, len, flags); if (remainder == 0 || err != len) break; - sent += err; + *sent_p += err; ppage++; base = 0; } - if (sent == 0) - return err; - if (err > 0) - sent += err; - return sent; + if (err > 0) { + *sent_p += err; + err = 0; + } + return err; } /** @@ -435,17 +386,18 @@ * @xdr: buffer containing this request * @base: starting position in the buffer * @zerocopy: true if it is safe to use sendpage() + * @sent_p: return the total number of bytes successfully queued for sending * */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy) +static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p) { unsigned int remainder = xdr->len - base; - int err, sent = 0; + int err = 0; + int sent = 0; if (unlikely(!sock)) return -ENOTSOCK; - clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); if (base != 0) { addr = NULL; addrlen = 0; @@ -457,7 +409,7 @@ err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); if (remainder == 0 || err != len) goto out; - sent += err; + *sent_p += err; base = 0; } else base -= xdr->head[0].iov_len; @@ -465,23 +417,23 @@ if (base < xdr->page_len) { unsigned int len = xdr->page_len - base; remainder -= len; - err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy); - if (remainder == 0 || err != len) + err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent); + *sent_p += sent; + if (remainder == 0 || sent != len) goto out; - sent += err; base = 0; } else base -= xdr->page_len; if (base >= xdr->tail[0].iov_len) - return sent; + return 0; err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); out: - if (sent == 0) - return err; - if (err > 0) - sent += err; - return sent; + if (err > 0) { + *sent_p += err; + err = 0; + } + return err; } static void xs_nospace_callback(struct rpc_task *task) @@ -489,7 +441,6 @@ struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); transport->inet->sk_write_pending--; - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); } /** @@ -514,25 +465,25 @@ /* Don't race with disconnect */ if (xprt_connected(xprt)) { - if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) { - /* - * Notify TCP that we're limited by the application - * window size - */ - set_bit(SOCK_NOSPACE, &transport->sock->flags); - sk->sk_write_pending++; - /* ...and wait for more buffer space */ - xprt_wait_for_buffer_space(task, xs_nospace_callback); - } - } else { - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + /* wait for more buffer space */ + sk->sk_write_pending++; + xprt_wait_for_buffer_space(task, xs_nospace_callback); + } else ret = -ENOTCONN; - } spin_unlock_bh(&xprt->transport_lock); /* Race breaker in case memory is freed before above code is called */ - sk->sk_write_space(sk); + if (ret == -EAGAIN) { + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags); + rcu_read_unlock(); + + sk->sk_write_space(sk); + } return ret; } @@ -565,19 +516,24 @@ container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; int status; + int sent = 0; xs_encode_stream_record_marker(&req->rq_snd_buf); xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); - status = xs_sendpages(transport->sock, NULL, 0, - xdr, req->rq_bytes_sent, true); + status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent, + true, &sent); dprintk("RPC: %s(%u) = %d\n", __func__, xdr->len - req->rq_bytes_sent, status); - if (likely(status >= 0)) { - req->rq_bytes_sent += status; - req->rq_xmit_bytes_sent += status; + + if (status == -EAGAIN && sock_writeable(transport->inet)) + status = -ENOBUFS; + + if (likely(sent > 0) || status == 0) { + req->rq_bytes_sent += sent; + req->rq_xmit_bytes_sent += sent; if (likely(req->rq_bytes_sent >= req->rq_slen)) { req->rq_bytes_sent = 0; return 0; @@ -586,6 +542,8 @@ } switch (status) { + case -ENOBUFS: + break; case -EAGAIN: status = xs_nospace(task); break; @@ -617,6 +575,7 @@ struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; + int sent = 0; int status; xs_pktdump("packet data:", @@ -625,22 +584,28 @@ if (!xprt_bound(xprt)) return -ENOTCONN; - status = xs_sendpages(transport->sock, - xs_addr(xprt), - xprt->addrlen, xdr, - req->rq_bytes_sent, true); + status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, + xdr, req->rq_bytes_sent, true, &sent); dprintk("RPC: xs_udp_send_request(%u) = %d\n", xdr->len - req->rq_bytes_sent, status); - if (status >= 0) { - req->rq_xmit_bytes_sent += status; - if (status >= req->rq_slen) + /* firewall is blocking us, don't return -EAGAIN or we end up looping */ + if (status == -EPERM) + goto process_status; + + if (status == -EAGAIN && sock_writeable(transport->inet)) + status = -ENOBUFS; + + if (sent > 0 || status == 0) { + req->rq_xmit_bytes_sent += sent; + if (sent >= req->rq_slen) return 0; /* Still some bytes left; set up for a retry later. */ status = -EAGAIN; } +process_status: switch (status) { case -ENOTSOCK: status = -ENOTCONN; @@ -649,37 +614,23 @@ case -EAGAIN: status = xs_nospace(task); break; - default: - dprintk("RPC: sendmsg returned unrecognized error %d\n", - -status); case -ENETUNREACH: + case -ENOBUFS: case -EPIPE: case -ECONNREFUSED: + case -EPERM: /* When the server has died, an ICMP port unreachable message * prompts ECONNREFUSED. */ - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + break; + default: + dprintk("RPC: sendmsg returned unrecognized error %d\n", + -status); } return status; } /** - * xs_tcp_shutdown - gracefully shut down a TCP socket - * @xprt: transport - * - * Initiates a graceful shutdown of the TCP socket by calling the - * equivalent of shutdown(SHUT_WR); - */ -static void xs_tcp_shutdown(struct rpc_xprt *xprt) -{ - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct socket *sock = transport->sock; - - if (sock != NULL) - kernel_sock_shutdown(sock, SHUT_WR); -} - -/** * xs_tcp_send_request - write an RPC request to a TCP socket * @task: address of RPC task that manages the state of an RPC request * @@ -701,6 +652,7 @@ struct xdr_buf *xdr = &req->rq_snd_buf; bool zerocopy = true; int status; + int sent; xs_encode_stream_record_marker(&req->rq_snd_buf); @@ -718,30 +670,31 @@ * to cope with writespace callbacks arriving _after_ we have * called sendmsg(). */ while (1) { - status = xs_sendpages(transport->sock, - NULL, 0, xdr, req->rq_bytes_sent, - zerocopy); + sent = 0; + status = xs_sendpages(transport->sock, NULL, 0, xdr, + req->rq_bytes_sent, zerocopy, &sent); dprintk("RPC: xs_tcp_send_request(%u) = %d\n", xdr->len - req->rq_bytes_sent, status); - if (unlikely(status < 0)) - break; - /* If we've sent the entire packet, immediately * reset the count of bytes sent. */ - req->rq_bytes_sent += status; - req->rq_xmit_bytes_sent += status; + req->rq_bytes_sent += sent; + req->rq_xmit_bytes_sent += sent; if (likely(req->rq_bytes_sent >= req->rq_slen)) { req->rq_bytes_sent = 0; return 0; } - if (status != 0) - continue; - status = -EAGAIN; - break; + if (status < 0) + break; + if (sent == 0) { + status = -EAGAIN; + break; + } } + if (status == -EAGAIN && sk_stream_is_writeable(transport->inet)) + status = -ENOBUFS; switch (status) { case -ENOTSOCK: @@ -751,15 +704,16 @@ case -EAGAIN: status = xs_nospace(task); break; - default: - dprintk("RPC: sendmsg returned unrecognized error %d\n", - -status); case -ECONNRESET: - xs_tcp_shutdown(xprt); case -ECONNREFUSED: case -ENOTCONN: + case -EADDRINUSE: + case -ENOBUFS: case -EPIPE: - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + break; + default: + dprintk("RPC: sendmsg returned unrecognized error %d\n", + -status); } return status; @@ -799,6 +753,7 @@ transport->old_data_ready = sk->sk_data_ready; transport->old_state_change = sk->sk_state_change; transport->old_write_space = sk->sk_write_space; + transport->old_error_report = sk->sk_error_report; } static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) @@ -806,18 +761,69 @@ sk->sk_data_ready = transport->old_data_ready; sk->sk_state_change = transport->old_state_change; sk->sk_write_space = transport->old_write_space; + sk->sk_error_report = transport->old_error_report; +} + +static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) +{ + smp_mb__before_atomic(); + clear_bit(XPRT_CLOSE_WAIT, &xprt->state); + clear_bit(XPRT_CLOSING, &xprt->state); + smp_mb__after_atomic(); +} + +static void xs_sock_mark_closed(struct rpc_xprt *xprt) +{ + xs_sock_reset_connection_flags(xprt); + /* Mark transport as closed and wake up all pending tasks */ + xprt_disconnect_done(xprt); +} + +/** + * xs_error_report - callback to handle TCP socket state errors + * @sk: socket + * + * Note: we don't call sock_error() since there may be a rpc_task + * using the socket, and so we don't want to clear sk->sk_err. + */ +static void xs_error_report(struct sock *sk) +{ + struct rpc_xprt *xprt; + int err; + + read_lock_bh(&sk->sk_callback_lock); + if (!(xprt = xprt_from_sock(sk))) + goto out; + + err = -sk->sk_err; + if (err == 0) + goto out; + /* Is this a reset event? */ + if (sk->sk_state == TCP_CLOSE) + xs_sock_mark_closed(xprt); + dprintk("RPC: xs_error_report client %p, error=%d...\n", + xprt, -err); + trace_rpc_socket_error(xprt, sk->sk_socket, err); + xprt_wake_pending_tasks(xprt, err); + out: + read_unlock_bh(&sk->sk_callback_lock); } static void xs_reset_transport(struct sock_xprt *transport) { struct socket *sock = transport->sock; struct sock *sk = transport->inet; + struct rpc_xprt *xprt = &transport->xprt; if (sk == NULL) return; - transport->srcport = 0; + if (atomic_read(&transport->xprt.swapper)) + sk_clear_memalloc(sk); + + kernel_sock_shutdown(sock, SHUT_RDWR); + mutex_lock(&transport->recv_mutex); write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -825,10 +831,12 @@ sk->sk_user_data = NULL; xs_restore_old_callbacks(transport, sk); + xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); + xs_sock_reset_connection_flags(xprt); + mutex_unlock(&transport->recv_mutex); - sk->sk_no_check = 0; - + trace_rpc_socket_close(xprt, sock); sock_release(sock); } @@ -851,28 +859,20 @@ xs_reset_transport(transport); xprt->reestablish_timeout = 0; - smp_mb__before_clear_bit(); - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_clear_bit(); xprt_disconnect_done(xprt); } -static void xs_tcp_close(struct rpc_xprt *xprt) +static void xs_inject_disconnect(struct rpc_xprt *xprt) { - if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) - xs_close(xprt); - else - xs_tcp_shutdown(xprt); + dprintk("RPC: injecting transport disconnect on xprt=%p\n", + xprt); + xprt_disconnect_done(xprt); } -static void xs_local_destroy(struct rpc_xprt *xprt) +static void xs_xprt_free(struct rpc_xprt *xprt) { - xs_close(xprt); xs_free_peer_addresses(xprt); xprt_free(xprt); - module_put(THIS_MODULE); } /** @@ -882,18 +882,15 @@ */ static void xs_destroy(struct rpc_xprt *xprt) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); dprintk("RPC: xs_destroy xprt %p\n", xprt); cancel_delayed_work_sync(&transport->connect_worker); - - xs_local_destroy(xprt); -} - -static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) -{ - return (struct rpc_xprt *) sk->sk_user_data; + xs_close(xprt); + cancel_work_sync(&transport->recv_worker); + xs_xprt_free(xprt); + module_put(THIS_MODULE); } static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) @@ -912,45 +909,36 @@ } /** - * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets - * @sk: socket with data to read - * @len: how much data to read + * xs_local_data_read_skb + * @xprt: transport + * @sk: socket + * @skb: skbuff * * Currently this assumes we can read the whole reply in a single gulp. */ -static void xs_local_data_ready(struct sock *sk, int len) +static void xs_local_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: %s...\n", __func__); - xprt = xprt_from_sock(sk); - if (xprt == NULL) - goto out; - - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) - goto out; - repsize = skb->len - sizeof(rpc_fraghdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -968,51 +956,68 @@ xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: - read_unlock_bh(&sk->sk_callback_lock); + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_local_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_local_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_local_data_receive(transport); } /** - * xs_udp_data_ready - "data ready" callback for UDP sockets - * @sk: socket with data to read - * @len: how much data to read + * xs_udp_data_read_skb - receive callback for UDP sockets + * @xprt: transport + * @sk: socket + * @skb: skbuff * */ -static void xs_udp_data_ready(struct sock *sk, int len) +static void xs_udp_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: xs_udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) - goto out; - - if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out; - repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d!\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(struct udphdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -1033,10 +1038,54 @@ xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_udp_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_udp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_udp_data_receive(transport); +} + +/** + * xs_data_ready - "data ready" callback for UDP sockets + * @sk: socket with data to read + * + */ +static void xs_data_ready(struct sock *sk) +{ + struct rpc_xprt *xprt; + + read_lock_bh(&sk->sk_callback_lock); + dprintk("RPC: xs_data_ready...\n"); + xprt = xprt_from_sock(sk); + if (xprt != NULL) { + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); + queue_work(rpciod_workqueue, &transport->recv_worker); + } read_unlock_bh(&sk->sk_callback_lock); } @@ -1046,7 +1095,6 @@ */ static void xs_tcp_force_close(struct rpc_xprt *xprt) { - set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); xprt_force_disconnect(xprt); } @@ -1252,12 +1300,12 @@ dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_rqst(xprt, transport->tcp_xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return -1; } @@ -1266,7 +1314,7 @@ if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1278,41 +1326,29 @@ * If we're unable to obtain the rpc_rqst we schedule the closing of the * connection and return -1. */ -static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, +static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct rpc_rqst *req; - req = xprt_alloc_bc_request(xprt); + /* Look up and lock the request corresponding to the given XID */ + spin_lock_bh(&xprt->transport_lock); + req = xprt_lookup_bc_request(xprt, transport->tcp_xid); if (req == NULL) { + spin_unlock_bh(&xprt->transport_lock); printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); return -1; } - req->rq_xid = transport->tcp_xid; dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); xs_tcp_read_common(xprt, desc, req); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { - struct svc_serv *bc_serv = xprt->bc_serv; - - /* - * Add callback request to callback list. The callback - * service sleeps on the sv_cb_waitq waiting for new - * requests. Wake it up after adding enqueing the - * request. - */ - dprintk("RPC: add callback request to list\n"); - spin_lock(&bc_serv->sv_cb_lock); - list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); - spin_unlock(&bc_serv->sv_cb_lock); - wake_up(&bc_serv->sv_cb_waitq); - } - - req->rq_private_buf.len = transport->tcp_copied; + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) + xprt_complete_bc_request(req, transport->tcp_copied); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1327,6 +1363,17 @@ xs_tcp_read_reply(xprt, desc) : xs_tcp_read_callback(xprt, desc); } + +static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) +{ + int ret; + + ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0, + SVC_SOCK_ANONYMOUS); + if (ret < 0) + return ret; + return 0; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -1382,6 +1429,7 @@ dprintk("RPC: xs_tcp_data_recv started\n"); do { + trace_xs_tcp_data_recv(transport); /* Read in a new fragment marker if necessary */ /* Can we ever really expect to get completely empty fragments? */ if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { @@ -1406,91 +1454,77 @@ /* Skip over any trailing bytes on short reads */ xs_tcp_read_discard(transport, &desc); } while (desc.count); + trace_xs_tcp_data_recv(transport); dprintk("RPC: xs_tcp_data_recv done\n"); return len - desc.count; } +static void xs_tcp_data_receive(struct sock_xprt *transport) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct sock *sk; + read_descriptor_t rd_desc = { + .count = 2*1024*1024, + .arg.data = xprt, + }; + unsigned long total = 0; + int read = 0; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + + /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ + for (;;) { + lock_sock(sk); + read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + release_sock(sk); + if (read <= 0) + break; + total += read; + rd_desc.count = 65536; + } +out: + mutex_unlock(&transport->recv_mutex); + trace_xs_tcp_data_ready(xprt, read, total); +} + +static void xs_tcp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_tcp_data_receive(transport); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read - * @bytes: how much data to read * */ -static void xs_tcp_data_ready(struct sock *sk, int bytes) +static void xs_tcp_data_ready(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - read_descriptor_t rd_desc; - int read; dprintk("RPC: xs_tcp_data_ready...\n"); read_lock_bh(&sk->sk_callback_lock); if (!(xprt = xprt_from_sock(sk))) goto out; + transport = container_of(xprt, struct sock_xprt, xprt); + /* Any data means we had a useful conversation, so * the we don't need to delay the next reconnect */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; + queue_work(rpciod_workqueue, &transport->recv_worker); - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - rd_desc.arg.data = xprt; - do { - rd_desc.count = 65536; - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - } while (read > 0); out: read_unlock_bh(&sk->sk_callback_lock); } -/* - * Do the equivalent of linger/linger2 handling for dealing with - * broken servers that don't close the socket in a timely - * fashion - */ -static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, - unsigned long timeout) -{ - struct sock_xprt *transport; - - if (xprt_test_and_set_connecting(xprt)) - return; - set_bit(XPRT_CONNECTION_ABORT, &xprt->state); - transport = container_of(xprt, struct sock_xprt, xprt); - queue_delayed_work(rpciod_workqueue, &transport->connect_worker, - timeout); -} - -static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) -{ - struct sock_xprt *transport; - - transport = container_of(xprt, struct sock_xprt, xprt); - - if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || - !cancel_delayed_work(&transport->connect_worker)) - return; - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - xprt_clear_connecting(xprt); -} - -static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) -{ - smp_mb__before_clear_bit(); - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state); - clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_clear_bit(); -} - -static void xs_sock_mark_closed(struct rpc_xprt *xprt) -{ - xs_sock_reset_connection_flags(xprt); - /* Mark transport as closed and wake up all pending tasks */ - xprt_disconnect_done(xprt); -} - /** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed @@ -1499,6 +1533,7 @@ static void xs_tcp_state_change(struct sock *sk) { struct rpc_xprt *xprt; + struct sock_xprt *transport; read_lock_bh(&sk->sk_callback_lock); if (!(xprt = xprt_from_sock(sk))) @@ -1510,12 +1545,12 @@ sock_flag(sk, SOCK_ZAPPED), sk->sk_shutdown); + transport = container_of(xprt, struct sock_xprt, xprt); + trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { - struct sock_xprt *transport = container_of(xprt, - struct sock_xprt, xprt); /* Reset TCP record info */ transport->tcp_offset = 0; @@ -1523,6 +1558,9 @@ transport->tcp_copied = 0; transport->tcp_flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; + xprt->connect_cookie++; + clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); + xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, -EAGAIN); } @@ -1533,11 +1571,10 @@ xprt->connect_cookie++; xprt->reestablish_timeout = 0; set_bit(XPRT_CLOSING, &xprt->state); - smp_mb__before_clear_bit(); + smp_mb__before_atomic(); clear_bit(XPRT_CONNECTED, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - smp_mb__after_clear_bit(); - xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); + smp_mb__after_atomic(); break; case TCP_CLOSE_WAIT: /* The server initiated a shutdown of the socket */ @@ -1554,13 +1591,14 @@ break; case TCP_LAST_ACK: set_bit(XPRT_CLOSING, &xprt->state); - xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); - smp_mb__before_clear_bit(); + smp_mb__before_atomic(); clear_bit(XPRT_CONNECTED, &xprt->state); - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); break; case TCP_CLOSE: - xs_tcp_cancel_linger_timeout(xprt); + if (test_and_clear_bit(XPRT_SOCK_CONNECTING, + &transport->sock_state)) + xprt_clear_connecting(xprt); xs_sock_mark_closed(xprt); } out: @@ -1569,19 +1607,23 @@ static void xs_write_space(struct sock *sk) { - struct socket *sock; + struct socket_wq *wq; struct rpc_xprt *xprt; - if (unlikely(!(sock = sk->sk_socket))) + if (!sk->sk_socket) return; - clear_bit(SOCK_NOSPACE, &sock->flags); + clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); if (unlikely(!(xprt = xprt_from_sock(sk)))) return; - if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) - return; + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) + goto out; xprt_write_space(xprt); +out: + rcu_read_unlock(); } /** @@ -1620,7 +1662,7 @@ read_lock_bh(&sk->sk_callback_lock); /* from net/core/stream.c:sk_stream_write_space */ - if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) + if (sk_stream_is_writeable(sk)) xs_write_space(sk); read_unlock_bh(&sk->sk_callback_lock); @@ -1678,11 +1720,45 @@ static unsigned short xs_get_random_port(void) { unsigned short range = xprt_max_resvport - xprt_min_resvport; - unsigned short rand = (unsigned short) net_random() % range; + unsigned short rand = (unsigned short) prandom_u32() % range; return rand + xprt_min_resvport; } /** + * xs_set_reuseaddr_port - set the socket's port and address reuse options + * @sock: socket + * + * Note that this function has to be called on all sockets that share the + * same port, and it must be called before binding. + */ +static void xs_sock_set_reuseport(struct socket *sock) +{ + int opt = 1; + + kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, + (char *)&opt, sizeof(opt)); +} + +static unsigned short xs_sock_getport(struct socket *sock) +{ + struct sockaddr_storage buf; + int buflen; + unsigned short port = 0; + + if (kernel_getsockname(sock, (struct sockaddr *)&buf, &buflen) < 0) + goto out; + switch (buf.ss_family) { + case AF_INET6: + port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port); + break; + case AF_INET: + port = ntohs(((struct sockaddr_in *)&buf)->sin_port); + } +out: + return port; +} + +/** * xs_set_port - reset the port number in the remote endpoint address * @xprt: generic transport * @port: new port number @@ -1696,6 +1772,12 @@ xs_update_peer_port(xprt); } +static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) +{ + if (transport->srcport == 0) + transport->srcport = xs_sock_getport(sock); +} + static unsigned short xs_get_srcport(struct sock_xprt *transport) { unsigned short port = transport->srcport; @@ -1722,13 +1804,29 @@ unsigned short port = xs_get_srcport(transport); unsigned short last; + /* + * If we are asking for any ephemeral port (i.e. port == 0 && + * transport->xprt.resvport == 0), don't bind. Let the local + * port selection happen implicitly when the socket is used + * (for example at connect time). + * + * This ensures that we can continue to establish TCP + * connections even when all local ephemeral ports are already + * a part of some TCP connection. This makes no difference + * for UDP sockets, but also doens't harm them. + * + * If we're asking for any reserved port (i.e. port == 0 && + * transport->xprt.resvport == 1) xs_get_srcport above will + * ensure that port is non-zero and we will bind as needed. + */ + if (port == 0) + return 0; + memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); do { rpc_set_port((struct sockaddr *)&myaddr, port); err = kernel_bind(sock, (struct sockaddr *)&myaddr, transport->xprt.addrlen); - if (port == 0) - break; if (err == 0) { transport->srcport = port; break; @@ -1828,8 +1926,13 @@ } #endif +static void xs_dummy_setup_socket(struct work_struct *work) +{ +} + static struct socket *xs_create_sock(struct rpc_xprt *xprt, - struct sock_xprt *transport, int family, int type, int protocol) + struct sock_xprt *transport, int family, int type, + int protocol, bool reuseport) { struct socket *sock; int err; @@ -1842,6 +1945,9 @@ } xs_reclassify_socket(family, sock); + if (reuseport) + xs_sock_set_reuseport(sock); + err = xs_bind(transport, sock); if (err) { sock_release(sock); @@ -1867,9 +1973,10 @@ xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_local_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; - sk->sk_allocation = GFP_ATOMIC; + sk->sk_error_report = xs_error_report; + sk->sk_allocation = GFP_NOIO; xprt_clear_connected(xprt); @@ -1888,9 +1995,7 @@ /** * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint - * @xprt: RPC transport to connect * @transport: socket transport to connect - * @create_sock: function to create a socket of the correct type */ static int xs_local_setup_socket(struct sock_xprt *transport) { @@ -1898,9 +2003,6 @@ struct socket *sock; int status = -EIO; - current->flags |= PF_FSTRANS; - - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); status = __sock_create(xprt->xprt_net, AF_LOCAL, SOCK_STREAM, 0, &sock, 1); if (status < 0) { @@ -1914,11 +2016,13 @@ xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); status = xs_local_finish_connecting(xprt, sock); + trace_rpc_socket_connect(xprt, sock, status); switch (status) { case 0: dprintk("RPC: xprt %p connected to %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); xprt_set_connected(xprt); + case -ENOBUFS: break; case -ENOENT: dprintk("RPC: xprt %p: socket %s does not exist\n", @@ -1937,7 +2041,6 @@ out: xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); - current->flags &= ~PF_FSTRANS; return status; } @@ -1964,43 +2067,84 @@ msleep_interruptible(15000); } -#ifdef CONFIG_SUNRPC_SWAP +#if IS_ENABLED(CONFIG_SUNRPC_SWAP) +/* + * Note that this should be called with XPRT_LOCKED held (or when we otherwise + * know that we have exclusive access to the socket), to guard against + * races with xs_reset_transport. + */ static void xs_set_memalloc(struct rpc_xprt *xprt) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - if (xprt->swapper) + /* + * If there's no sock, then we have nothing to set. The + * reconnecting process will get it for us. + */ + if (!transport->inet) + return; + if (atomic_read(&xprt->swapper)) sk_set_memalloc(transport->inet); } /** - * xs_swapper - Tag this transport as being used for swap. + * xs_enable_swap - Tag this transport as being used for swap. * @xprt: transport to tag - * @enable: enable/disable * + * Take a reference to this transport on behalf of the rpc_clnt, and + * optionally mark it for swapping if it wasn't already. */ -int xs_swapper(struct rpc_xprt *xprt, int enable) +static int +xs_enable_swap(struct rpc_xprt *xprt) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, - xprt); - int err = 0; + struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); - if (enable) { - xprt->swapper++; - xs_set_memalloc(xprt); - } else if (xprt->swapper) { - xprt->swapper--; - sk_clear_memalloc(transport->inet); - } + if (atomic_inc_return(&xprt->swapper) != 1) + return 0; + if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) + return -ERESTARTSYS; + if (xs->inet) + sk_set_memalloc(xs->inet); + xprt_release_xprt(xprt, NULL); + return 0; +} - return err; +/** + * xs_disable_swap - Untag this transport as being used for swap. + * @xprt: transport to tag + * + * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the + * swapper refcount goes to 0, untag the socket as a memalloc socket. + */ +static void +xs_disable_swap(struct rpc_xprt *xprt) +{ + struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); + + if (!atomic_dec_and_test(&xprt->swapper)) + return; + if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) + return; + if (xs->inet) + sk_clear_memalloc(xs->inet); + xprt_release_xprt(xprt, NULL); } -EXPORT_SYMBOL_GPL(xs_swapper); #else static void xs_set_memalloc(struct rpc_xprt *xprt) { } + +static int +xs_enable_swap(struct rpc_xprt *xprt) +{ + return -EINVAL; +} + +static void +xs_disable_swap(struct rpc_xprt *xprt) +{ +} #endif static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) @@ -2015,10 +2159,9 @@ xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_udp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; - sk->sk_no_check = UDP_CSUM_NORCV; - sk->sk_allocation = GFP_ATOMIC; + sk->sk_allocation = GFP_NOIO; xprt_set_connected(xprt); @@ -2041,12 +2184,9 @@ struct socket *sock = transport->sock; int status = -EIO; - current->flags |= PF_FSTRANS; - - /* Start by resetting any existing state */ - xs_reset_transport(transport); sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP); + xs_addr(xprt)->sa_family, SOCK_DGRAM, + IPPROTO_UDP, false); if (IS_ERR(sock)) goto out; @@ -2057,60 +2197,33 @@ xprt->address_strings[RPC_DISPLAY_PORT]); xs_udp_finish_connecting(xprt, sock); + trace_rpc_socket_connect(xprt, sock, 0); status = 0; out: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); - current->flags &= ~PF_FSTRANS; } -/* - * We need to preserve the port number so the reply cache on the server can - * find our cached RPC replies when we get around to reconnecting. +/** + * xs_tcp_shutdown - gracefully shut down a TCP socket + * @xprt: transport + * + * Initiates a graceful shutdown of the TCP socket by calling the + * equivalent of shutdown(SHUT_RDWR); */ -static void xs_abort_connection(struct sock_xprt *transport) -{ - int result; - struct sockaddr any; - - dprintk("RPC: disconnecting xprt %p to reuse port\n", transport); - - /* - * Disconnect the transport socket by doing a connect operation - * with AF_UNSPEC. This should return immediately... - */ - memset(&any, 0, sizeof(any)); - any.sa_family = AF_UNSPEC; - result = kernel_connect(transport->sock, &any, sizeof(any), 0); - if (!result) - xs_sock_reset_connection_flags(&transport->xprt); - dprintk("RPC: AF_UNSPEC connect return code %d\n", result); -} - -static void xs_tcp_reuse_connection(struct sock_xprt *transport) +static void xs_tcp_shutdown(struct rpc_xprt *xprt) { - unsigned int state = transport->inet->sk_state; + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct socket *sock = transport->sock; - if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) { - /* we don't need to abort the connection if the socket - * hasn't undergone a shutdown - */ - if (transport->inet->sk_shutdown == 0) - return; - dprintk("RPC: %s: TCP_CLOSEd and sk_shutdown set to %d\n", - __func__, transport->inet->sk_shutdown); - } - if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) { - /* we don't need to abort the connection if the socket - * hasn't undergone a shutdown - */ - if (transport->inet->sk_shutdown == 0) - return; - dprintk("RPC: %s: ESTABLISHED/SYN_SENT " - "sk_shutdown set to %d\n", - __func__, transport->inet->sk_shutdown); - } - xs_abort_connection(transport); + if (sock == NULL) + return; + if (xprt_connected(xprt)) { + kernel_sock_shutdown(sock, SHUT_RDWR); + trace_rpc_socket_shutdown(xprt, sock); + } else + xs_reset_transport(transport); } static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) @@ -2120,6 +2233,26 @@ if (!transport->inet) { struct sock *sk = sock->sk; + unsigned int keepidle = xprt->timeout->to_initval / HZ; + unsigned int keepcnt = xprt->timeout->to_retries + 1; + unsigned int opt_on = 1; + unsigned int timeo; + + /* TCP Keepalive options */ + kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + (char *)&opt_on, sizeof(opt_on)); + kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, + (char *)&keepidle, sizeof(keepidle)); + kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, + (char *)&keepidle, sizeof(keepidle)); + kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, + (char *)&keepcnt, sizeof(keepcnt)); + + /* TCP user timeout (see RFC5482) */ + timeo = jiffies_to_msecs(xprt->timeout->to_initval) * + (xprt->timeout->to_retries + 1); + kernel_setsockopt(sock, SOL_TCP, TCP_USER_TIMEOUT, + (char *)&timeo, sizeof(timeo)); write_lock_bh(&sk->sk_callback_lock); @@ -2129,12 +2262,11 @@ sk->sk_data_ready = xs_tcp_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; - sk->sk_allocation = GFP_ATOMIC; + sk->sk_error_report = xs_error_report; + sk->sk_allocation = GFP_NOIO; /* socket options */ - sk->sk_userlocks |= SOCK_BINDPORT_LOCK; sock_reset_flag(sk, SOCK_LINGER); - tcp_sk(sk)->linger2 = 0; tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; xprt_clear_connected(xprt); @@ -2154,14 +2286,19 @@ /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; + set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { case 0: + xs_set_srcport(transport, sock); case -EINPROGRESS: /* SYN_SENT! */ - xprt->connect_cookie++; if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; + break; + case -EADDRNOTAVAIL: + /* Source port number is unavailable. Try a new one! */ + transport->srcport = 0; } out: return ret; @@ -2169,9 +2306,6 @@ /** * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint - * @xprt: RPC transport to connect - * @transport: socket transport to connect - * @create_sock: function to create a socket of the correct type * * Invoked by a work queue tasklet. */ @@ -2183,26 +2317,14 @@ struct rpc_xprt *xprt = &transport->xprt; int status = -EIO; - current->flags |= PF_FSTRANS; - if (!sock) { - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP); + xs_addr(xprt)->sa_family, SOCK_STREAM, + IPPROTO_TCP, true); if (IS_ERR(sock)) { status = PTR_ERR(sock); goto out; } - } else { - int abort_and_exit; - - abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, - &xprt->state); - /* "close" the socket, preserving the local port */ - xs_tcp_reuse_connection(transport); - - if (abort_and_exit) - goto out_eagain; } dprintk("RPC: worker connecting xprt %p via %s to " @@ -2212,6 +2334,7 @@ xprt->address_strings[RPC_DISPLAY_PORT]); status = xs_tcp_finish_connecting(xprt, sock); + trace_rpc_socket_connect(xprt, sock, status); dprintk("RPC: %p connect status %d connected %d sock state %d\n", xprt, -status, xprt_connected(xprt), sock->sk->sk_state); @@ -2228,8 +2351,7 @@ case 0: case -EINPROGRESS: case -EALREADY: - xprt_clear_connecting(xprt); - current->flags &= ~PF_FSTRANS; + xprt_unlock_connect(xprt, transport); return; case -EINVAL: /* Happens, for instance, if the user specified a link @@ -2238,15 +2360,17 @@ case -ECONNREFUSED: case -ECONNRESET: case -ENETUNREACH: + case -EADDRINUSE: + case -ENOBUFS: /* retry with existing socket, after a delay */ + xs_tcp_force_close(xprt); goto out; } -out_eagain: status = -EAGAIN; out: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); - current->flags &= ~PF_FSTRANS; } /** @@ -2267,10 +2391,16 @@ { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { + WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); + + if (transport->sock != NULL) { dprintk("RPC: xs_connect delayed xprt %p for %lu " "seconds\n", xprt, xprt->reestablish_timeout / HZ); + + /* Start by resetting any existing state */ + xs_reset_transport(transport); + queue_delayed_work(rpciod_workqueue, &transport->connect_worker, xprt->reestablish_timeout); @@ -2446,7 +2576,7 @@ { struct rpc_rqst *req = task->tk_rqstp; struct svc_xprt *xprt; - u32 len; + int len; dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); /* @@ -2491,6 +2621,10 @@ static void bc_destroy(struct rpc_xprt *xprt) { + dprintk("RPC: bc_destroy xprt %p\n", xprt); + + xs_xprt_free(xprt); + module_put(THIS_MODULE); } static struct rpc_xprt_ops xs_local_ops = { @@ -2505,8 +2639,10 @@ .send_request = xs_local_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, .close = xs_close, - .destroy = xs_local_destroy, + .destroy = xs_destroy, .print_stats = xs_local_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, }; static struct rpc_xprt_ops xs_udp_ops = { @@ -2526,6 +2662,9 @@ .close = xs_close, .destroy = xs_destroy, .print_stats = xs_udp_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, + .inject_disconnect = xs_inject_disconnect, }; static struct rpc_xprt_ops xs_tcp_ops = { @@ -2539,9 +2678,18 @@ .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, - .close = xs_tcp_close, + .close = xs_tcp_shutdown, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, + .inject_disconnect = xs_inject_disconnect, +#ifdef CONFIG_SUNRPC_BACKCHANNEL + .bc_setup = xprt_setup_bc, + .bc_up = xs_tcp_bc_up, + .bc_free_rqst = xprt_free_bc_rqst, + .bc_destroy = xprt_destroy_bc, +#endif }; /* @@ -2552,7 +2700,6 @@ .reserve_xprt = xprt_reserve_xprt, .release_xprt = xprt_release_xprt, .alloc_slot = xprt_alloc_slot, - .rpcbind = xs_local_rpcbind, .buf_alloc = bc_malloc, .buf_free = bc_free, .send_request = bc_send_request, @@ -2560,6 +2707,9 @@ .close = bc_close, .destroy = bc_destroy, .print_stats = xs_tcp_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, + .inject_disconnect = xs_inject_disconnect, }; static int xs_init_anyaddr(const int family, struct sockaddr *sap) @@ -2610,6 +2760,7 @@ } new = container_of(xprt, struct sock_xprt, xprt); + mutex_init(&new->recv_mutex); memcpy(&xprt->addr, args->dstaddr, args->addrlen); xprt->addrlen = args->addrlen; if (args->srcaddr) @@ -2663,6 +2814,10 @@ xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; + INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, + xs_dummy_setup_socket); + switch (sun->sun_family) { case AF_LOCAL: if (sun->sun_path[0] != '/') { @@ -2689,7 +2844,7 @@ return xprt; ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2731,21 +2886,20 @@ xprt->timeout = &xs_udp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); break; default: @@ -2767,7 +2921,7 @@ return xprt; ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2810,21 +2964,20 @@ xprt->ops = &xs_tcp_ops; xprt->timeout = &xs_tcp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); break; default: @@ -2842,12 +2995,11 @@ xprt->address_strings[RPC_DISPLAY_ADDR], xprt->address_strings[RPC_DISPLAY_PROTO]); - if (try_module_get(THIS_MODULE)) return xprt; ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2864,15 +3016,6 @@ struct svc_sock *bc_sock; struct rpc_xprt *ret; - if (args->bc_xprt->xpt_bc_xprt) { - /* - * This server connection already has a backchannel - * export; we can't create a new one, as we wouldn't be - * able to match replies based on xid any more. So, - * reuse the already-existing one: - */ - return args->bc_xprt->xpt_bc_xprt; - } xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, xprt_tcp_slot_table_entries); if (IS_ERR(xprt)) @@ -2913,10 +3056,9 @@ /* * Once we've associated a backchannel xprt with a connection, - * we want to keep it around as long as long as the connection - * lasts, in case we need to start using it for a backchannel - * again; this reference won't be dropped until bc_xprt is - * destroyed. + * we want to keep it around as long as the connection lasts, + * in case we need to start using it for a backchannel again; + * this reference won't be dropped until bc_xprt is destroyed. */ xprt_get(xprt); args->bc_xprt->xpt_bc_xprt = xprt; @@ -2931,13 +3073,14 @@ */ xprt_set_connected(xprt); - if (try_module_get(THIS_MODULE)) return xprt; + + args->bc_xprt->xpt_bc_xprt = NULL; xprt_put(xprt); ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2979,7 +3122,7 @@ */ int init_socket_xprt(void) { -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (!sunrpc_table_header) sunrpc_table_header = register_sysctl_table(sunrpc_table); #endif @@ -2998,7 +3141,7 @@ */ void cleanup_socket_xprt(void) { -#ifdef RPC_DEBUG +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (sunrpc_table_header) { unregister_sysctl_table(sunrpc_table_header); sunrpc_table_header = NULL; @@ -3015,12 +3158,12 @@ const struct kernel_param *kp, unsigned int min, unsigned int max) { - unsigned long num; + unsigned int num; int ret; if (!val) return -EINVAL; - ret = strict_strtoul(val, 0, &num); + ret = kstrtouint(val, 0, &num); if (ret == -EINVAL || num < min || num > max) return -EINVAL; *((unsigned int *)kp->arg) = num; @@ -3034,7 +3177,7 @@ RPC_MAX_RESVPORT); } -static struct kernel_param_ops param_ops_portnr = { +static const struct kernel_param_ops param_ops_portnr = { .set = param_set_portnr, .get = param_get_uint, }; @@ -3053,7 +3196,7 @@ RPC_MAX_SLOT_TABLE); } -static struct kernel_param_ops param_ops_slot_table_size = { +static const struct kernel_param_ops param_ops_slot_table_size = { .set = param_set_slot_table_size, .get = param_get_uint, }; @@ -3069,7 +3212,7 @@ RPC_MAX_SLOT_TABLE_LIMIT); } -static struct kernel_param_ops param_ops_max_slot_table_size = { +static const struct kernel_param_ops param_ops_max_slot_table_size = { .set = param_set_max_slot_table_size, .get = param_get_uint, };