From patchwork Tue Sep 27 16:41:54 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Ursula Braun X-Patchwork-Id: 675723 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3sk6804b7xz9s5w for ; Wed, 28 Sep 2016 02:42:40 +1000 (AEST) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S935178AbcI0Qmi (ORCPT ); Tue, 27 Sep 2016 12:42:38 -0400 Received: from mx0b-001b2d01.pphosted.com ([148.163.158.5]:40402 "EHLO mx0a-001b2d01.pphosted.com" rhost-flags-OK-OK-OK-FAIL) by vger.kernel.org with ESMTP id S935112AbcI0QmP (ORCPT ); Tue, 27 Sep 2016 12:42:15 -0400 Received: from pps.filterd (m0098414.ppops.net [127.0.0.1]) by mx0b-001b2d01.pphosted.com (8.16.0.17/8.16.0.17) with SMTP id u8RGcSqU022666 for ; Tue, 27 Sep 2016 12:42:14 -0400 Received: from e06smtp07.uk.ibm.com (e06smtp07.uk.ibm.com [195.75.94.103]) by mx0b-001b2d01.pphosted.com with ESMTP id 25qt4a7q40-1 (version=TLSv1.2 cipher=AES256-SHA bits=256 verify=NOT) for ; Tue, 27 Sep 2016 12:42:14 -0400 Received: from localhost by e06smtp07.uk.ibm.com with IBM ESMTP SMTP Gateway: Authorized Use Only! Violators will be prosecuted for from ; Tue, 27 Sep 2016 17:42:12 +0100 Received: from d06dlp01.portsmouth.uk.ibm.com (9.149.20.13) by e06smtp07.uk.ibm.com (192.168.101.137) with IBM ESMTP SMTP Gateway: Authorized Use Only! Violators will be prosecuted; Tue, 27 Sep 2016 17:42:10 +0100 Received: from b06cxnps3074.portsmouth.uk.ibm.com (d06relay09.portsmouth.uk.ibm.com [9.149.109.194]) by d06dlp01.portsmouth.uk.ibm.com (Postfix) with ESMTP id E5D7A17D8024; Tue, 27 Sep 2016 17:44:11 +0100 (BST) Received: from d06av03.portsmouth.uk.ibm.com (d06av03.portsmouth.uk.ibm.com [9.149.37.213]) by b06cxnps3074.portsmouth.uk.ibm.com (8.14.9/8.14.9/NCO v10.0) with ESMTP id u8RGgAhr3342612; Tue, 27 Sep 2016 16:42:10 GMT Received: from d06av03.portsmouth.uk.ibm.com (localhost [127.0.0.1]) by d06av03.portsmouth.uk.ibm.com (8.14.4/8.14.4/NCO v10.0 AVout) with ESMTP id u8RGg8nJ010134; Tue, 27 Sep 2016 10:42:09 -0600 Received: from tuxmaker.boeblingen.de.ibm.com (tuxmaker.boeblingen.de.ibm.com [9.152.85.9]) by d06av03.portsmouth.uk.ibm.com (8.14.4/8.14.4/NCO v10.0 AVin) with ESMTP id u8RGfxmO009800 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA256 bits=256 verify=NO); Tue, 27 Sep 2016 10:42:08 -0600 From: Ursula Braun To: davem@davemloft.net Cc: netdev@vger.kernel.org, linux-s390@vger.kernel.org, schwidefsky@de.ibm.com, heiko.carstens@de.ibm.com, utz.bacher@de.ibm.com, ubraun@linux.vnet.ibm.com Subject: [PATCH V2 net-next 13/15] smc: receive data from RMBE Date: Tue, 27 Sep 2016 18:41:54 +0200 X-Mailer: git-send-email 2.8.4 In-Reply-To: <20160927164156.26184-1-ubraun@linux.vnet.ibm.com> References: <20160927164156.26184-1-ubraun@linux.vnet.ibm.com> X-TM-AS-MML: disable X-Content-Scanned: Fidelis XPS MAILER x-cbid: 16092716-0028-0000-0000-000002249CF0 X-IBM-AV-DETECTION: SAVI=unused REMOTE=unused XFE=unused x-cbparentid: 16092716-0029-0000-0000-00002099ECC2 Message-Id: <20160927164156.26184-14-ubraun@linux.vnet.ibm.com> X-Proofpoint-Virus-Version: vendor=fsecure engine=2.50.10432:, , definitions=2016-09-27_07:, , signatures=0 X-Proofpoint-Spam-Details: rule=outbound_spam_definite policy=outbound score=100 spamscore=100 suspectscore=3 malwarescore=0 phishscore=0 adultscore=0 bulkscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.0.1-1609020000 definitions=main-1609270301 Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org move RMBE data into user space buffer and update managing cursors Signed-off-by: Ursula Braun --- net/smc/Makefile | 2 +- net/smc/af_smc.c | 7 +- net/smc/smc.h | 4 + net/smc/smc_cdc.c | 6 +- net/smc/smc_core.c | 10 +++ net/smc/smc_rx.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++ net/smc/smc_rx.h | 22 ++++++ net/smc/smc_tx.c | 37 +++++++++ net/smc/smc_tx.h | 1 + 9 files changed, 302 insertions(+), 3 deletions(-) create mode 100644 net/smc/smc_rx.c create mode 100644 net/smc/smc_rx.h diff --git a/net/smc/Makefile b/net/smc/Makefile index fc28d79..6255e29 100644 --- a/net/smc/Makefile +++ b/net/smc/Makefile @@ -1,3 +1,3 @@ obj-$(CONFIG_SMC) += smc.o smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o -smc-y += smc_cdc.o smc_tx.o +smc-y += smc_cdc.o smc_tx.o smc_rx.o diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c index c96a234..4652759 100644 --- a/net/smc/af_smc.c +++ b/net/smc/af_smc.c @@ -37,6 +37,7 @@ #include "smc_ib.h" #include "smc_pnet.h" #include "smc_tx.h" +#include "smc_rx.h" static DEFINE_MUTEX(smc_create_lgr_pending); /* serialize link group * creation @@ -422,6 +423,7 @@ static int smc_connect_rdma(struct smc_sock *smc) mutex_unlock(&smc_create_lgr_pending); smc_tx_init(smc); + smc_rx_init(smc); out_connected: smc_copy_sock_settings_to_clc(smc); @@ -765,6 +767,7 @@ static void smc_listen_work(struct work_struct *work) } smc_tx_init(new_smc); + smc_rx_init(new_smc); out_connected: sk_refcnt_debug_inc(newsmcsk); @@ -960,7 +963,7 @@ static int smc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, if (smc->use_fallback) rc = smc->clcsock->ops->recvmsg(smc->clcsock, msg, len, flags); else - rc = sock_no_recvmsg(sock, msg, len, flags); + rc = smc_rx_recvmsg(smc, msg, len, flags); out: release_sock(sk); return rc; @@ -1026,6 +1029,8 @@ static unsigned int smc_poll(struct file *file, struct socket *sock, sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk); set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); } + if (atomic_read(&smc->conn.bytes_to_rcv)) + mask |= POLLIN | POLLRDNORM; /* for now - to be enhanced in follow-on patch */ } diff --git a/net/smc/smc.h b/net/smc/smc.h index 191e913..13ceb9b 100644 --- a/net/smc/smc.h +++ b/net/smc/smc.h @@ -112,6 +112,10 @@ struct smc_connection { struct smc_buf_desc *rmb_desc; /* RMBE descriptor */ int rmbe_size; /* RMBE size <== sock rmem */ int rmbe_size_short;/* compressed notation */ + int rmbe_update_limit; + /* lower limit for consumer + * cursor update + */ struct smc_host_cdc_msg local_tx_ctrl; /* host byte order staging * buffer for CDC msg send diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c index d5e4a24..0a679c0 100644 --- a/net/smc/smc_cdc.c +++ b/net/smc/smc_cdc.c @@ -15,6 +15,7 @@ #include "smc_wr.h" #include "smc_cdc.h" #include "smc_tx.h" +#include "smc_rx.h" /********************************** send *************************************/ @@ -197,6 +198,7 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, atomic_add(diff_prod, &conn->bytes_to_rcv); /* guarantee 0 <= bytes_to_rcv <= rmbe_size */ smp_mb__after_atomic(); + smc->sk.sk_data_ready(&smc->sk); } if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) @@ -220,7 +222,9 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, return; /* data available */ - /* subsequent patch: send delayed ack, wake receivers */ + if ((conn->local_rx_ctrl.prod_flags.write_blocked) || + (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req)) + smc_tx_consumer_update(conn); } /* called under tasklet context */ diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c index cb42d63..011093f 100644 --- a/net/smc/smc_core.c +++ b/net/smc/smc_core.c @@ -515,6 +515,15 @@ struct smc_buf_desc *smc_rmb_get_slot(struct smc_link_group *lgr, return NULL; } +/* one of the conditions for announcing a receiver's current window size is + * that it "results in a minimum increase in the window size of 10% of the + * receive buffer space" [RFC7609] + */ +static inline int smc_rmb_wnd_update_limit(int rmbe_size) +{ + return min_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2); +} + /* create the tx buffer for an SMC socket */ int smc_sndbuf_create(struct smc_sock *smc) { @@ -646,6 +655,7 @@ int smc_rmb_create(struct smc_sock *smc) conn->rmbe_size_short = tmp_bufsize_short; smc->sk.sk_rcvbuf = tmp_bufsize * 2; atomic_set(&conn->bytes_to_rcv, 0); + conn->rmbe_update_limit = smc_rmb_wnd_update_limit(tmp_bufsize); return 0; } else { return -ENOMEM; diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c new file mode 100644 index 0000000..0807d1f --- /dev/null +++ b/net/smc/smc_rx.c @@ -0,0 +1,216 @@ +/* + * Shared Memory Communications over RDMA (SMC-R) and RoCE + * + * Manage RMBE + * copy new RMBE data into user space + * + * Copyright IBM Corp. 2016 + * + * Author(s): Ursula Braun + */ + +#include +#include +#include + +#include "smc.h" +#include "smc_core.h" +#include "smc_cdc.h" +#include "smc_tx.h" /* smc_tx_consumer_update() */ +#include "smc_rx.h" + +/* callback implementation for sk.sk_data_ready() + * to wakeup rcvbuf consumers that blocked with smc_rx_wait_data(). + * indirectly called by smc_cdc_msg_recv_action(). + */ +static void smc_rx_data_ready(struct sock *sk) +{ + struct socket_wq *wq; + + /* derived from sock_def_readable() */ + /* called already in smc_listen_work() */ + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (skwq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI | + POLLRDNORM | POLLRDBAND); + if ((sk->sk_shutdown == SHUTDOWN_MASK) || + (sk->sk_state == SMC_CLOSED)) + sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); + else + sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); + rcu_read_unlock(); +} + +/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted + * @smc smc socket + * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout + * Returns: + * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. + * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). + */ +static int smc_rx_wait_data(struct smc_sock *smc, long *timeo) +{ + struct smc_connection *conn = &smc->conn; + struct sock *sk = &smc->sk; + DEFINE_WAIT(wait); + int rc; + + if (atomic_read(&conn->bytes_to_rcv)) + return 1; + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); + sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); + rc = sk_wait_event(sk, timeo, + sk->sk_err || + sk->sk_shutdown & RCV_SHUTDOWN || + sock_flag(sk, SOCK_DONE) || + atomic_read(&conn->bytes_to_rcv) || + smc_cdc_rxed_any_close_or_senddone(conn)); + sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); + finish_wait(sk_sleep(sk), &wait); + return rc; +} + +/* rcvbuf consumer: main API called by socket layer. + * called under sk lock. + */ +int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len, + int flags) +{ + size_t copylen, read_done = 0, read_remaining = len; + size_t chunk_len, chunk_off, chunk_len_sum; + struct smc_connection *conn = &smc->conn; + union smc_host_cursor cons; + int readable, chunk; + char *rcvbuf_base; + struct sock *sk; + long timeo; + int target; /* Read at least these many bytes */ + int rc; + + if (unlikely(flags & MSG_ERRQUEUE)) + return -EINVAL; /* future work for sk.sk_family == AF_SMC */ + if (flags & MSG_OOB) + return -EINVAL; /* future work */ + + sk = &smc->sk; + if (sk->sk_state == SMC_LISTEN) + return -ENOTCONN; + timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); + + msg->msg_namelen = 0; + /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ + rcvbuf_base = conn->rmb_desc->cpu_addr; + + do { /* while (read_remaining) */ + if (read_done >= target) + break; + + if (atomic_read(&conn->bytes_to_rcv)) + goto copy; + + if (read_done) { + if (sk->sk_err || + sk->sk_state == SMC_CLOSED || + (sk->sk_shutdown & RCV_SHUTDOWN) || + !timeo || + signal_pending(current) || + smc_cdc_rxed_any_close_or_senddone(conn) || + conn->local_tx_ctrl.conn_state_flags. + peer_conn_abort) + break; + } else { + if (sock_flag(sk, SOCK_DONE)) + break; + if (sk->sk_err) { + read_done = sock_error(sk); + break; + } + if (sk->sk_shutdown & RCV_SHUTDOWN || + smc_cdc_rxed_any_close_or_senddone(conn) || + conn->local_tx_ctrl.conn_state_flags. + peer_conn_abort) + break; + if (sk->sk_state == SMC_CLOSED) { + if (!sock_flag(sk, SOCK_DONE)) { + /* This occurs when user tries to read + * from never connected socket. + */ + read_done = -ENOTCONN; + break; + } + break; + } + if (signal_pending(current)) { + read_done = sock_intr_errno(timeo); + break; + } + } + + if (!atomic_read(&conn->bytes_to_rcv)) { + smc_rx_wait_data(smc, &timeo); + continue; + } + +copy: + /* initialize variables for 1st iteration of subsequent loop */ + /* could be just 1 byte, even after smc_rx_wait_data above */ + readable = atomic_read(&conn->bytes_to_rcv); + /* not more than what user space asked for */ + copylen = min_t(size_t, read_remaining, readable); + smc_curs_write(&cons, + smc_curs_read(&conn->local_tx_ctrl.cons, conn), + conn); + /* determine chunks where to read from rcvbuf */ + /* either unwrapped case, or 1st chunk of wrapped case */ + chunk_len = min_t(size_t, + copylen, conn->rmbe_size - cons.count); + chunk_len_sum = chunk_len; + chunk_off = cons.count; + for (chunk = 0; chunk < 2; chunk++) { + if (!(flags & MSG_TRUNC)) { + rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off, + chunk_len); + if (rc) { + if (!read_done) + read_done = -EFAULT; + goto out; + } + } + read_remaining -= chunk_len; + read_done += chunk_len; + + if (chunk_len_sum == copylen) + break; /* either on 1st or 2nd iteration */ + /* prepare next (== 2nd) iteration */ + chunk_len = copylen - chunk_len; /* remainder */ + chunk_len_sum += chunk_len; + chunk_off = 0; /* modulo offset in recv ring buffer */ + } + + /* update cursors */ + if (!(flags & MSG_PEEK)) { + smc_curs_add(conn->rmbe_size, &cons, copylen); + /* increased in recv tasklet smc_cdc_msg_rcv() */ + smp_mb__before_atomic(); + atomic_sub(copylen, &conn->bytes_to_rcv); + /* guarantee 0 <= bytes_to_rcv <= rmbe_size */ + smp_mb__after_atomic(); + smc_curs_write(&conn->local_tx_ctrl.cons, + smc_curs_read(&cons, conn), + conn); + /* send consumer cursor update if required */ + /* similar to advertising new TCP rcv_wnd if required */ + smc_tx_consumer_update(conn); + } + } while (read_remaining); +out: + return read_done; +} + +/* Initialize receive properties on connection establishment. NB: not __init! */ +void smc_rx_init(struct smc_sock *smc) +{ + smc->sk.sk_data_ready = smc_rx_data_ready; +} diff --git a/net/smc/smc_rx.h b/net/smc/smc_rx.h new file mode 100644 index 0000000..08fcf8d --- /dev/null +++ b/net/smc/smc_rx.h @@ -0,0 +1,22 @@ +/* + * Shared Memory Communications over RDMA (SMC-R) and RoCE + * + * Manage RMBE + * + * Copyright IBM Corp. 2016 + * + * Author(s): Ursula Braun + */ + +#ifndef SMC_RX_H +#define SMC_RX_H + +#include +#include + +#include "smc.h" + +void smc_rx_init(struct smc_sock *); +int smc_rx_recvmsg(struct smc_sock *, struct msghdr *, size_t, int); + +#endif /* SMC_RX_H */ diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c index 368d27b..b8d779b 100644 --- a/net/smc/smc_tx.c +++ b/net/smc/smc_tx.c @@ -426,6 +426,43 @@ static void smc_tx_work(struct work_struct *work) release_sock(&smc->sk); } +void smc_tx_consumer_update(struct smc_connection *conn) +{ + union smc_host_cursor cfed, cons; + struct smc_cdc_tx_pend *pend; + struct smc_wr_buf *wr_buf; + int to_confirm, rc; + + smc_curs_write(&cons, + smc_curs_read(&conn->local_tx_ctrl.cons, conn), + conn); + smc_curs_write(&cfed, + smc_curs_read(&conn->rx_curs_confirmed, conn), + conn); + to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons); + + if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || + ((to_confirm > conn->rmbe_update_limit) && + ((to_confirm > (conn->rmbe_size / 2)) || + conn->local_rx_ctrl.prod_flags.write_blocked))) { + rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], + &wr_buf, &pend); + if (!rc) + rc = smc_cdc_msg_send(conn, wr_buf, pend); + if (rc < 0) { + schedule_work(&conn->tx_work); + return; + } + smc_curs_write(&conn->rx_curs_confirmed, + smc_curs_read(&conn->local_tx_ctrl.cons, conn), + conn); + conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; + } + if (conn->local_rx_ctrl.prod_flags.write_blocked && + !atomic_read(&conn->bytes_to_rcv)) + conn->local_rx_ctrl.prod_flags.write_blocked = 0; +} + /***************************** send initialize *******************************/ /* Initialize send properties on connection establishment. NB: not __init! */ diff --git a/net/smc/smc_tx.h b/net/smc/smc_tx.h index 27db7b2..6760891 100644 --- a/net/smc/smc_tx.h +++ b/net/smc/smc_tx.h @@ -30,5 +30,6 @@ void smc_tx_init(struct smc_sock *); int smc_tx_sendmsg(struct smc_sock *, struct msghdr *, size_t); int smc_tx_sndbuf_nonempty(struct smc_connection *); void smc_tx_sndbuf_nonfull(struct smc_sock *); +void smc_tx_consumer_update(struct smc_connection *); #endif /* SMC_TX_H */