--- zzzz-none-000/linux-3.10.107/net/rds/ib_cm.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/net/rds/ib_cm.c 2021-02-04 17:41:59.000000000 +0000 @@ -39,36 +39,6 @@ #include "rds.h" #include "ib.h" -static char *rds_ib_event_type_strings[] = { -#define RDS_IB_EVENT_STRING(foo) \ - [IB_EVENT_##foo] = __stringify(IB_EVENT_##foo) - RDS_IB_EVENT_STRING(CQ_ERR), - RDS_IB_EVENT_STRING(QP_FATAL), - RDS_IB_EVENT_STRING(QP_REQ_ERR), - RDS_IB_EVENT_STRING(QP_ACCESS_ERR), - RDS_IB_EVENT_STRING(COMM_EST), - RDS_IB_EVENT_STRING(SQ_DRAINED), - RDS_IB_EVENT_STRING(PATH_MIG), - RDS_IB_EVENT_STRING(PATH_MIG_ERR), - RDS_IB_EVENT_STRING(DEVICE_FATAL), - RDS_IB_EVENT_STRING(PORT_ACTIVE), - RDS_IB_EVENT_STRING(PORT_ERR), - RDS_IB_EVENT_STRING(LID_CHANGE), - RDS_IB_EVENT_STRING(PKEY_CHANGE), - RDS_IB_EVENT_STRING(SM_CHANGE), - RDS_IB_EVENT_STRING(SRQ_ERR), - RDS_IB_EVENT_STRING(SRQ_LIMIT_REACHED), - RDS_IB_EVENT_STRING(QP_LAST_WQE_REACHED), - RDS_IB_EVENT_STRING(CLIENT_REREGISTER), -#undef RDS_IB_EVENT_STRING -}; - -static char *rds_ib_event_str(enum ib_event_type type) -{ - return rds_str_array(rds_ib_event_type_strings, - ARRAY_SIZE(rds_ib_event_type_strings), type); -}; - /* * Set the selected protocol version */ @@ -165,7 +135,7 @@ rds_ib_recv_init_ring(ic); /* Post receive buffers - as a side effect, this will update * the posted credit count. */ - rds_ib_recv_refill(conn, 1); + rds_ib_recv_refill(conn, 1, GFP_KERNEL); /* Tune RNR behavior */ rds_ib_tune_rnr(ic, &qp_attr); @@ -183,8 +153,17 @@ /* If the peer gave us the last packet it saw, process this as if * we had received a regular ACK. */ - if (dp && dp->dp_ack_seq) - rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL); + if (dp) { + /* dp structure start is not guaranteed to be 8 bytes aligned. + * Since dp_ack_seq is 64-bit extended load operations can be + * used so go through get_unaligned to avoid unaligned errors. + */ + __be64 dp_ack_seq = get_unaligned(&dp->dp_ack_seq); + + if (dp_ack_seq) + rds_send_drop_acked(conn, be64_to_cpu(dp_ack_seq), + NULL); + } rds_connect_complete(conn); } @@ -234,7 +213,97 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data) { rdsdebug("event %u (%s) data %p\n", - event->event, rds_ib_event_str(event->event), data); + event->event, ib_event_msg(event->event), data); +} + +/* Plucking the oldest entry from the ring can be done concurrently with + * the thread refilling the ring. Each ring operation is protected by + * spinlocks and the transient state of refilling doesn't change the + * recording of which entry is oldest. + * + * This relies on IB only calling one cq comp_handler for each cq so that + * there will only be one caller of rds_recv_incoming() per RDS connection. + */ +static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_recv_tasklet); +} + +static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, + struct ib_wc *wcs, + struct rds_ib_ack_state *ack_state) +{ + int nr; + int i; + struct ib_wc *wc; + + while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) { + for (i = 0; i < nr; i++) { + wc = wcs + i; + rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + wc->byte_len, be32_to_cpu(wc->ex.imm_data)); + + if (wc->wr_id & RDS_IB_SEND_OP) + rds_ib_send_cqe_handler(ic, wc); + else + rds_ib_recv_cqe_handler(ic, wc, ack_state); + } + } +} + +static void rds_ib_tasklet_fn_send(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *)data; + struct rds_connection *conn = ic->conn; + struct rds_ib_ack_state state; + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&state, 0, sizeof(state)); + poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state); + ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP); + poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state); + + if (rds_conn_up(conn) && + (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || + test_bit(0, &conn->c_map_queued))) + rds_send_xmit(ic->conn); +} + +static void rds_ib_tasklet_fn_recv(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *)data; + struct rds_connection *conn = ic->conn; + struct rds_ib_device *rds_ibdev = ic->rds_ibdev; + struct rds_ib_ack_state state; + + if (!rds_ibdev) + rds_conn_drop(conn); + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&state, 0, sizeof(state)); + poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); + ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); + poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); + + if (state.ack_next_valid) + rds_ib_set_ack(ic, state.ack_next, state.ack_required); + if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) { + rds_send_drop_acked(conn, state.ack_recv, NULL); + ic->i_ack_recv = state.ack_recv; + } + + if (rds_conn_up(conn)) + rds_ib_attempt_ack(ic); } static void rds_ib_qp_event_handler(struct ib_event *event, void *data) @@ -243,7 +312,7 @@ struct rds_ib_connection *ic = conn->c_transport_data; rdsdebug("conn %p ic %p event %u (%s)\n", conn, ic, event->event, - rds_ib_event_str(event->event)); + ib_event_msg(event->event)); switch (event->event) { case IB_EVENT_COMM_EST: @@ -252,13 +321,25 @@ default: rdsdebug("Fatal QP Event %u (%s) " "- connection %pI4->%pI4, reconnecting\n", - event->event, rds_ib_event_str(event->event), + event->event, ib_event_msg(event->event), &conn->c_laddr, &conn->c_faddr); rds_conn_drop(conn); break; } } +static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_send_tasklet); +} + /* * This needs to be very careful to not leave IS_ERR pointers around for * cleanup to trip over. @@ -268,6 +349,7 @@ struct rds_ib_connection *ic = conn->c_transport_data; struct ib_device *dev = ic->i_cm_id->device; struct ib_qp_init_attr attr; + struct ib_cq_init_attr cq_attr = {}; struct rds_ib_device *rds_ibdev; int ret; @@ -289,11 +371,12 @@ /* Protection domain and memory range */ ic->i_pd = rds_ibdev->pd; - ic->i_mr = rds_ibdev->mr; - ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler, + cq_attr.cqe = ic->i_send_ring.w_nr + 1; + + ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send, rds_ib_cq_event_handler, conn, - ic->i_send_ring.w_nr + 1, 0); + &cq_attr); if (IS_ERR(ic->i_send_cq)) { ret = PTR_ERR(ic->i_send_cq); ic->i_send_cq = NULL; @@ -301,9 +384,10 @@ goto out; } - ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler, + cq_attr.cqe = ic->i_recv_ring.w_nr; + ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, rds_ib_cq_event_handler, conn, - ic->i_recv_ring.w_nr, 0); + &cq_attr); if (IS_ERR(ic->i_recv_cq)) { ret = PTR_ERR(ic->i_recv_cq); ic->i_recv_cq = NULL; @@ -393,7 +477,7 @@ rds_ib_recv_init_ack(ic); - rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr, + rdsdebug("conn %p pd %p cq %p %p\n", conn, ic->i_pd, ic->i_send_cq, ic->i_recv_cq); out: @@ -466,8 +550,9 @@ (unsigned long long)be64_to_cpu(lguid), (unsigned long long)be64_to_cpu(fguid)); - conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport, - GFP_KERNEL); + /* RDS/IB is not currently netns aware, thus init_net */ + conn = rds_conn_create(&init_net, dp->dp_daddr, dp->dp_saddr, + &rds_ib_transport, GFP_KERNEL); if (IS_ERR(conn)) { rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn)); conn = NULL; @@ -583,7 +668,7 @@ /* XXX I wonder what affect the port space has */ /* delegate cm event handler to rdma_transport */ - ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn, + ic->i_cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, conn, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(ic->i_cm_id)) { ret = PTR_ERR(ic->i_cm_id); @@ -655,8 +740,18 @@ wait_event(rds_ib_ring_empty_wait, rds_ib_ring_empty(&ic->i_recv_ring) && (atomic_read(&ic->i_signaled_sends) == 0)); + tasklet_kill(&ic->i_send_tasklet); tasklet_kill(&ic->i_recv_tasklet); + /* first destroy the ib state that generates callbacks */ + if (ic->i_cm_id->qp) + rdma_destroy_qp(ic->i_cm_id); + if (ic->i_send_cq) + ib_destroy_cq(ic->i_send_cq); + if (ic->i_recv_cq) + ib_destroy_cq(ic->i_recv_cq); + + /* then free the resources that ib callbacks use */ if (ic->i_send_hdrs) ib_dma_free_coherent(dev, ic->i_send_ring.w_nr * @@ -680,12 +775,6 @@ if (ic->i_recvs) rds_ib_recv_clear_ring(ic); - if (ic->i_cm_id->qp) - rdma_destroy_qp(ic->i_cm_id); - if (ic->i_send_cq) - ib_destroy_cq(ic->i_send_cq); - if (ic->i_recv_cq) - ib_destroy_cq(ic->i_recv_cq); rdma_destroy_id(ic->i_cm_id); /* @@ -696,7 +785,6 @@ ic->i_cm_id = NULL; ic->i_pd = NULL; - ic->i_mr = NULL; ic->i_send_cq = NULL; ic->i_recv_cq = NULL; ic->i_send_hdrs = NULL; @@ -759,8 +847,10 @@ } INIT_LIST_HEAD(&ic->ib_node); - tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn, - (unsigned long) ic); + tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send, + (unsigned long)ic); + tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv, + (unsigned long)ic); mutex_init(&ic->i_recv_mutex); #ifndef KERNEL_HAS_ATOMIC64 spin_lock_init(&ic->i_ack_lock);