diff mbox series

[net,1/9] rxrpc: Pass the input handler's data skb reference to the Rx ring

Message ID 156647660836.11061.3521782307257461575.stgit@warthog.procyon.org.uk
State Deferred
Delegated to: David Miller
Headers show
Series rxrpc: Fix use of skb_cow_data() | expand

Commit Message

David Howells Aug. 22, 2019, 12:23 p.m. UTC
Pass the reference held on a DATA skb in the rxrpc input handler into the
Rx ring rather than getting an additional ref for this and then dropping
the original ref at the end.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/input.c |   48 ++++++++++++++++++++++++++++++++++--------------
 1 file changed, 34 insertions(+), 14 deletions(-)
diff mbox series

Patch

diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index dd47d465d1d3..5789ec5ad54f 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -416,7 +416,8 @@  static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
 }
 
 /*
- * Process a DATA packet, adding the packet to the Rx ring.
+ * Process a DATA packet, adding the packet to the Rx ring.  The caller's
+ * packet ref must be passed on or discarded.
  */
 static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
 {
@@ -425,20 +426,22 @@  static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
 	unsigned int offset = sizeof(struct rxrpc_wire_header);
 	unsigned int ix;
 	rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0;
-	rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
+	rxrpc_seq_t seq0 = sp->hdr.seq, seq, hard_ack;
 	bool immediate_ack = false, jumbo_bad = false, queued;
 	u16 len;
-	u8 ack = 0, flags, annotation = 0;
+	u8 ack = 0, flags, jumbo_flags, annotation = 0;
 
 	_enter("{%u,%u},{%u,%u}",
 	       call->rx_hard_ack, call->rx_top, skb->len, seq);
 
 	_proto("Rx DATA %%%u { #%u f=%02x }",
-	       sp->hdr.serial, seq, sp->hdr.flags);
+	       sp->hdr.serial, seq0, sp->hdr.flags);
 
 	state = READ_ONCE(call->state);
-	if (state >= RXRPC_CALL_COMPLETE)
+	if (state >= RXRPC_CALL_COMPLETE) {
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 		return;
+	}
 
 	if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST) {
 		unsigned long timo = READ_ONCE(call->next_req_timo);
@@ -463,7 +466,8 @@  static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
 	    !rxrpc_receiving_reply(call))
 		goto unlock;
 
-	call->ackr_prev_seq = seq;
+	call->ackr_prev_seq = seq0;
+	seq = seq0;
 
 	hard_ack = READ_ONCE(call->rx_hard_ack);
 	if (after(seq, hard_ack + call->rx_winsize)) {
@@ -533,12 +537,25 @@  static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
 	 * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
 	 * and also rxrpc_fill_out_ack().
 	 */
-	rxrpc_get_skb(skb, rxrpc_skb_rx_got);
+	if (flags & RXRPC_JUMBO_PACKET) {
+		rxrpc_get_skb(skb, rxrpc_skb_rx_got);
+		if (skb_copy_bits(skb, offset, &jumbo_flags, 1) < 0) {
+			rxrpc_proto_abort("XJF", call, seq);
+			goto unlock;
+		}
+	}
 	call->rxtx_annotations[ix] = annotation;
 	smp_wmb();
 	call->rxtx_buffer[ix] = skb;
 	if (after(seq, call->rx_top)) {
 		smp_store_release(&call->rx_top, seq);
+		if (!(flags & RXRPC_JUMBO_PACKET)) {
+			/* From this point on, we're not allowed to touch the
+			 * packet any longer as its ref now belongs to the Rx
+			 * ring.
+			 */
+			skb = NULL;
+		}
 	} else if (before(seq, call->rx_top)) {
 		/* Send an immediate ACK if we fill in a hole */
 		if (!ack) {
@@ -567,10 +584,7 @@  static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
 skip:
 	offset += len;
 	if (flags & RXRPC_JUMBO_PACKET) {
-		if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
-			rxrpc_proto_abort("XJF", call, seq);
-			goto unlock;
-		}
+		flags = jumbo_flags;
 		offset += sizeof(struct rxrpc_jumbo_header);
 		seq++;
 		serial++;
@@ -606,13 +620,14 @@  static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
 				  false, true,
 				  rxrpc_propose_ack_input_data);
 
-	if (sp->hdr.seq == READ_ONCE(call->rx_hard_ack) + 1) {
+	if (seq0 == READ_ONCE(call->rx_hard_ack) + 1) {
 		trace_rxrpc_notify_socket(call->debug_id, serial);
 		rxrpc_notify_socket(call);
 	}
 
 unlock:
 	spin_unlock(&call->input_lock);
+	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 	_leave(" [queued]");
 }
 
@@ -1021,7 +1036,7 @@  static void rxrpc_input_call_packet(struct rxrpc_call *call,
 	switch (sp->hdr.type) {
 	case RXRPC_PACKET_TYPE_DATA:
 		rxrpc_input_data(call, skb);
-		break;
+		goto no_free;
 
 	case RXRPC_PACKET_TYPE_ACK:
 		rxrpc_input_ack(call, skb);
@@ -1048,6 +1063,8 @@  static void rxrpc_input_call_packet(struct rxrpc_call *call,
 		break;
 	}
 
+	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+no_free:
 	_leave("");
 }
 
@@ -1373,8 +1390,11 @@  int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 		mutex_unlock(&call->user_mutex);
 	}
 
+	/* Process a call packet; this either discards or passes on the ref
+	 * elsewhere.
+	 */
 	rxrpc_input_call_packet(call, skb);
-	goto discard;
+	goto out;
 
 discard:
 	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);