diff mbox

[5/9] ipvs: use adaptive pause in master thread

Message ID alpine.LFD.2.00.1204112221250.2000@ja.ssi.bg
State Rejected
Headers show

Commit Message

Julian Anastasov April 11, 2012, 8:02 p.m. UTC
Hello,

On Tue, 10 Apr 2012, Pablo Neira Ayuso wrote:

> You can still use kthread_should_stop inside a wrapper function
> that calls kthread_stop and up() the semaphore.
> 
> sync_stop:
>         kthread_stop(k)
>         up(s)
> 
> kthread_routine:
>         while(1) {
>                 down(s)
>                 if (kthread_should_stop(k))
>                         break;
> 
>                 get sync msg
>                 send sync msg
>         }
> 
> BTW, each up() does not necessarily mean one wakeup event. up() will
> delivery only one wakeup event for one process that has been already
> awaken.

	OK, now I added up(). It will be called when
32 messages are queued after last sent by thread.

> > 	I'm still thinking if sndbuf value should be exported,
> > currently users have to modify the global default/max value.
> 
> I think it's a good idea.

	Done, used same value both for rcvbuf and sndbuf.

> > But in below version I'm trying to handle the sndbuf overflow
> > by blocking for write_space event. By this way we should work
> > with any sndbuf configuration.
> 
> You seem to be defering the overrun problem by using a longer
> intermediate queue than the socket buffer. Then, that queue can be
> tuned by the user via sysctl. It may happen under heavy stress that
> your intermediate queue gets full again, then you'll have to drop
> packets at some point.

	Yes, both values are for same thing, the problem is
that queue size is in messages while socket buffer is in bytes.
And as sndbuf config is optional, I'm not trying to derive
sync_qlen_max from sndbuf. May be we can do it after
socket is created but it will cause problem for systems
that do not configure sync_sock_size, they before now used
unlimited queue and may be default socket size, so using
some small default sndbuf as sync_qlen_max can cause message
drops. They will use reduced limits. So, now we provide
some large sync_qlen_max as default configuration
which probably exceeds the default socket buffer.

	Still, I think the down/up idea is not better.
We are adding two new vars: master_stopped and
master_sem.

	The problem is that kthread_stop() is a blocking
function. It waits thread to terminate. It can not wakeup
thread blocked in down(), so we add master_stopped flag
that will unblock the down() loop while kthread_stop() will also
unblock thread if waiting for write_space. I.e. up()+kthread_stop()
is racy without additional flag while kthread_stop()+up()
is not possible to work.

	I'm appending untested version with up+down but I think
we should use wake_up_process and schedule_timeout instead,
as in previous version.

--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Pablo Neira Ayuso April 12, 2012, 12:13 a.m. UTC | #1
Hi Julian,

On Wed, Apr 11, 2012 at 11:02:39PM +0300, Julian Anastasov wrote:
> 
> 	Hello,
> 
> On Tue, 10 Apr 2012, Pablo Neira Ayuso wrote:
> 
> > You can still use kthread_should_stop inside a wrapper function
> > that calls kthread_stop and up() the semaphore.
> > 
> > sync_stop:
> >         kthread_stop(k)
> >         up(s)
> > 
> > kthread_routine:
> >         while(1) {
> >                 down(s)
> >                 if (kthread_should_stop(k))
> >                         break;
> > 
> >                 get sync msg
> >                 send sync msg
> >         }
> > 
> > BTW, each up() does not necessarily mean one wakeup event. up() will
> > delivery only one wakeup event for one process that has been already
> > awaken.
> 
> 	OK, now I added up(). It will be called when
> 32 messages are queued after last sent by thread.

Why 32?

If you do up() once per message, you will still get an arbitrary
number of messages in the queue until the scheduler selects your
thread to enter the running state.

In other works, if you do up() once per 32 messages, your thread will
get N+32 messages in its queue by the time the scheduler makes it
enter the running state. Being N that amount of arbitrary messages.
This seems to me like more chances to overrun the socket buffer under
high stress.

> > > 	I'm still thinking if sndbuf value should be exported,
> > > currently users have to modify the global default/max value.
> > 
> > I think it's a good idea.
> 
> 	Done, used same value both for rcvbuf and sndbuf.
> 
> > > But in below version I'm trying to handle the sndbuf overflow
> > > by blocking for write_space event. By this way we should work
> > > with any sndbuf configuration.
> > 
> > You seem to be defering the overrun problem by using a longer
> > intermediate queue than the socket buffer. Then, that queue can be
> > tuned by the user via sysctl. It may happen under heavy stress that
> > your intermediate queue gets full again, then you'll have to drop
> > packets at some point.
> 
> 	Yes, both values are for same thing, the problem is
> that queue size is in messages while socket buffer is in bytes.
> And as sndbuf config is optional, I'm not trying to derive
> sync_qlen_max from sndbuf. May be we can do it after
> socket is created but it will cause problem for systems
> that do not configure sync_sock_size, they before now used
> unlimited queue and may be default socket size, so using
> some small default sndbuf as sync_qlen_max can cause message
> drops. They will use reduced limits. So, now we provide
> some large sync_qlen_max as default configuration
> which probably exceeds the default socket buffer.
>
> 	Still, I think the down/up idea is not better.
> We are adding two new vars: master_stopped and
> master_sem.

Well, this is not exactly the idea I had in mind.

> 	The problem is that kthread_stop() is a blocking
> function. It waits thread to terminate. It can not wakeup
> thread blocked in down(), so we add master_stopped flag
> that will unblock the down() loop while kthread_stop() will also
> unblock thread if waiting for write_space. I.e. up()+kthread_stop()
> is racy without additional flag while kthread_stop()+up()
> is not possible to work.

I don't see the up+kthread_stop() race you mention.

> 	I'm appending untested version with up+down but I think
> we should use wake_up_process and schedule_timeout instead,
> as in previous version.

OK.

I still think that using an intermediate queue is *not* the way
to achieve reliability and congestion control, sorry.

But, you seem to persist on the idea and I don't want to block your
developments, I just wanted to show my point and provide some ideas.
After all, you maintain that part of the code.

Please, tell me what patch you want me to apply and I'll take it.
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Julian Anastasov April 19, 2012, 10:51 p.m. UTC | #2
Hello Pablo,

On Thu, 12 Apr 2012, Pablo Neira Ayuso wrote:

> > 	OK, now I added up(). It will be called when
> > 32 messages are queued after last sent by thread.
> 
> Why 32?
> 
> If you do up() once per message, you will still get an arbitrary
> number of messages in the queue until the scheduler selects your
> thread to enter the running state.

	I understand this. It will save wakeups while
master thread is busy with messages but not if the
messages come with such gap that causes master thread
to send them one by one for every wakeup.

> In other works, if you do up() once per 32 messages, your thread will
> get N+32 messages in its queue by the time the scheduler makes it
> enter the running state. Being N that amount of arbitrary messages.
> This seems to me like more chances to overrun the socket buffer under
> high stress.

	I now modified the constant (to 8 which should be
8*8KB data, below default sndbuf) and the algorithm. The idea of
IPVS_SYNC_WAKEUP_RATE is to avoid situation where
we send wakeup for every message. It should be better
for the caching to send messages in short bursts.

> > 	Still, I think the down/up idea is not better.
> > We are adding two new vars: master_stopped and
> > master_sem.
> 
> Well, this is not exactly the idea I had in mind.

	Currently, we need a _timeout version because
sync_buff can be ready after 2 seconds and we do not
get wakeup for such incomplete buffer, we have to check it
from time to time. IIRC, using uninterruptible version
causes the thread to bump the CPU load usage, so it
is not appropriate - this state contributes to load.
It is really for busy state, not for idle state
waiting for messages to send.

> > 	The problem is that kthread_stop() is a blocking
> > function. It waits thread to terminate. It can not wakeup
> > thread blocked in down(), so we add master_stopped flag
> > that will unblock the down() loop while kthread_stop() will also
> > unblock thread if waiting for write_space. I.e. up()+kthread_stop()
> > is racy without additional flag while kthread_stop()+up()
> > is not possible to work.
> 
> I don't see the up+kthread_stop() race you mention.

	The problem is that __down_common exits only
on waiter.up != 0 (set only by __up). Here is the race:

master_thread		stop_sync_thread
----------------------------------------
down*

			STOP MASTER
			up()
next_sync_buff()=NULL
- no buffer to send

kthread_should_stop()?
Not yet

down*()

			- some delay here

			kthread_stop()
wakeup. Is semaphore
up (waiter.up)?
No => block again
			- we are blocked in
			kthread_stop()

The race is that master_thread can block again with
down() before kthread_stop() is reached by stop_sync_thread.
If master uses down_timeout it can exit this block but
after the timeout (-ETIME) which is not very good.
down_timeout uses TASK_UNINTERRUPTIBLE state :(

> > 	I'm appending untested version with up+down but I think
> > we should use wake_up_process and schedule_timeout instead,
> > as in previous version.
> 
> OK.
> 
> I still think that using an intermediate queue is *not* the way
> to achieve reliability and congestion control, sorry.

	It seems the idea here is not to delay the packet
processing with sending sync traffic from softirq, so we use
thread and intermediate queue for sending of sync messages,
probably by using idle CPU for this. It is a compromise
for setups with overloaded CPU for packets and other
idle CPU. During our tests for some sync parameters the
sync traffic was 100-200mbit which is not a small thing to
offload to other CPUs, even by using multiple master threads.
In such cases the CPU speed is bigger problem than the
memory used for intermediate queue.

> But, you seem to persist on the idea and I don't want to block your
> developments, I just wanted to show my point and provide some ideas.
> After all, you maintain that part of the code.
> 
> Please, tell me what patch you want me to apply and I'll take it.

	I'm posting new patchset that includes new version
of this patch. I hope it should be better, it limits the
delay of queued messages, so that conn state is synced without
big delays (20ms).

Regards

--
Julian Anastasov <ja@ssi.bg>
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 2bdee51..8ed41eb 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -870,6 +870,8 @@  struct netns_ipvs {
 #endif
 	int			sysctl_snat_reroute;
 	int			sysctl_sync_ver;
+	int			sysctl_sync_qlen_max;
+	int			sysctl_sync_sock_size;
 	int			sysctl_cache_bypass;
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
@@ -890,6 +892,10 @@  struct netns_ipvs {
 	struct timer_list	est_timer;	/* Estimation timer */
 	/* ip_vs_sync */
 	struct list_head	sync_queue;
+	int			sync_queue_len;
+	unsigned int		sync_queue_delay;
+	int			master_stopped;
+	struct semaphore	master_sem;
 	spinlock_t		sync_lock;
 	struct ip_vs_sync_buff  *sync_buff;
 	spinlock_t		sync_buff_lock;
@@ -912,6 +918,8 @@  struct netns_ipvs {
 #define DEFAULT_SYNC_THRESHOLD	3
 #define DEFAULT_SYNC_PERIOD	50
 #define DEFAULT_SYNC_VER	1
+#define IPVS_SYNC_WAKEUP_RATE	32
+#define IPVS_SYNC_QLEN_MAX	(IPVS_SYNC_WAKEUP_RATE * 4)
 
 #ifdef CONFIG_SYSCTL
 
@@ -930,6 +938,16 @@  static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return ipvs->sysctl_sync_ver;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_qlen_max;
+}
+
+static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_sock_size;
+}
+
 #else
 
 static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -947,6 +965,16 @@  static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return DEFAULT_SYNC_VER;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return IPVS_SYNC_QLEN_MAX;
+}
+
+static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
+{
+	return 0;
+}
+
 #endif
 
 /*
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index 964d426..2172fcc 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1718,6 +1718,18 @@  static struct ctl_table vs_vars[] = {
 		.proc_handler	= &proc_do_sync_mode,
 	},
 	{
+		.procname	= "sync_qlen_max",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
+		.procname	= "sync_sock_size",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
 		.procname	= "cache_bypass",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3662,6 +3674,10 @@  int __net_init ip_vs_control_net_init_sysctl(struct net *net)
 	tbl[idx++].data = &ipvs->sysctl_snat_reroute;
 	ipvs->sysctl_sync_ver = 1;
 	tbl[idx++].data = &ipvs->sysctl_sync_ver;
+	ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
+	tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
+	ipvs->sysctl_sync_sock_size = 0;
+	tbl[idx++].data = &ipvs->sysctl_sync_sock_size;
 	tbl[idx++].data = &ipvs->sysctl_cache_bypass;
 	tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
 	tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 0e36679..0bd473c 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -312,6 +312,9 @@  static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
 				struct ip_vs_sync_buff,
 				list);
 		list_del(&sb->list);
+		ipvs->sync_queue_len--;
+		if (!ipvs->sync_queue_len)
+			ipvs->sync_queue_delay = 0;
 	}
 	spin_unlock_bh(&ipvs->sync_lock);
 
@@ -358,9 +361,13 @@  static inline void sb_queue_tail(struct netns_ipvs *ipvs)
 	struct ip_vs_sync_buff *sb = ipvs->sync_buff;
 
 	spin_lock(&ipvs->sync_lock);
-	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+	if (ipvs->sync_state & IP_VS_STATE_MASTER &&
+	    ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
 		list_add_tail(&sb->list, &ipvs->sync_queue);
-	else
+		ipvs->sync_queue_len++;
+		if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE)
+			up(&ipvs->master_sem);
+	} else
 		ip_vs_sync_buff_release(sb);
 	spin_unlock(&ipvs->sync_lock);
 }
@@ -405,10 +412,11 @@  void ip_vs_sync_switch_mode(struct net *net, int mode)
 		ipvs->sync_buff = NULL;
 	} else {
 		spin_lock_bh(&ipvs->sync_lock);
-		if (ipvs->sync_state & IP_VS_STATE_MASTER)
+		if (ipvs->sync_state & IP_VS_STATE_MASTER) {
 			list_add_tail(&ipvs->sync_buff->list,
 				      &ipvs->sync_queue);
-		else
+			ipvs->sync_queue_len++;
+		} else
 			ip_vs_sync_buff_release(ipvs->sync_buff);
 		spin_unlock_bh(&ipvs->sync_lock);
 	}
@@ -1130,6 +1138,28 @@  static void ip_vs_process_message(struct net *net, __u8 *buffer,
 
 
 /*
+ *      Setup sndbuf (mode=1) or rcvbuf (mode=0)
+ */
+static void set_sock_size(struct sock *sk, int mode, int val)
+{
+	/* setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)); */
+	/* setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)); */
+	lock_sock(sk);
+	if (mode) {
+		val = clamp_t(int, val, (SOCK_MIN_SNDBUF + 1) / 2,
+			      sysctl_wmem_max);
+		sk->sk_sndbuf = val * 2;
+		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
+	} else {
+		val = clamp_t(int, val, (SOCK_MIN_RCVBUF + 1) / 2,
+			      sysctl_rmem_max);
+		sk->sk_rcvbuf = val * 2;
+		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
+	}
+	release_sock(sk);
+}
+
+/*
  *      Setup loopback of outgoing multicasts on a sending socket
  */
 static void set_mcast_loop(struct sock *sk, u_char loop)
@@ -1305,6 +1335,9 @@  static struct socket *make_send_sock(struct net *net)
 
 	set_mcast_loop(sock->sk, 0);
 	set_mcast_ttl(sock->sk, 1);
+	result = sysctl_sync_sock_size(ipvs);
+	if (result > 0)
+		set_sock_size(sock->sk, 1, result);
 
 	result = bind_mcastif_addr(sock, ipvs->master_mcast_ifn);
 	if (result < 0) {
@@ -1350,6 +1383,9 @@  static struct socket *make_receive_sock(struct net *net)
 	sk_change_net(sock->sk, net);
 	/* it is equivalent to the REUSEADDR option in user-space */
 	sock->sk->sk_reuse = 1;
+	result = sysctl_sync_sock_size(ipvs);
+	if (result > 0)
+		set_sock_size(sock->sk, 0, result);
 
 	result = sock->ops->bind(sock, (struct sockaddr *) &mcast_addr,
 			sizeof(struct sockaddr));
@@ -1392,18 +1428,22 @@  ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
 	return len;
 }
 
-static void
+static int
 ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
 {
 	int msize;
+	int ret;
 
 	msize = msg->size;
 
 	/* Put size in network byte order */
 	msg->size = htons(msg->size);
 
-	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
-		pr_err("ip_vs_send_async error\n");
+	ret = ip_vs_send_async(sock, (char *)msg, msize);
+	if (ret >= 0 || ret == -EAGAIN)
+		return ret;
+	pr_err("ip_vs_send_async error %d\n", ret);
+	return 0;
 }
 
 static int
@@ -1428,33 +1468,57 @@  ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 	return len;
 }
 
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+	struct ip_vs_sync_buff *sb;
+
+	sb = sb_dequeue(ipvs);
+	if (sb)
+		return sb;
+	/* Do not delay entries in buffer for more than 2 seconds */
+	return get_curr_sync_buff(ipvs, 2 * HZ);
+}
 
 static int sync_thread_master(void *data)
 {
 	struct ip_vs_sync_thread_data *tinfo = data;
 	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
+	struct sock *sk = tinfo->sock->sk;
 	struct ip_vs_sync_buff *sb;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
 		"syncid = %d\n",
 		ipvs->master_mcast_ifn, ipvs->master_syncid);
 
-	while (!kthread_should_stop()) {
-		while ((sb = sb_dequeue(ipvs))) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+	for (;;) {
+		sb = next_sync_buff(ipvs);
+		if (!sb) {
+			down_timeout(&ipvs->master_sem, HZ / 5);
+			if (ipvs->master_stopped)
+				break;
+			continue;
 		}
 
-		/* check if entries stay in ipvs->sync_buff for 2 seconds */
-		sb = get_curr_sync_buff(ipvs, 2 * HZ);
-		if (sb) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+retry:
+		if (unlikely(ipvs->master_stopped))
+			break;
+		if (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
+			int ret = 0;
+
+			__wait_event_interruptible(*sk_sleep(sk),
+						   sock_writeable(sk) ||
+						   kthread_should_stop(),
+						   ret);
+			goto retry;
 		}
-
-		schedule_timeout_interruptible(HZ);
+		ip_vs_sync_buff_release(sb);
 	}
 
+	if (sb)
+		ip_vs_sync_buff_release(sb);
+
 	/* clean up the sync_buff queue */
 	while ((sb = sb_dequeue(ipvs)))
 		ip_vs_sync_buff_release(sb);
@@ -1538,6 +1602,10 @@  int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
 		realtask = &ipvs->master_thread;
 		name = "ipvs_master:%d";
 		threadfn = sync_thread_master;
+		ipvs->sync_queue_len = 0;
+		ipvs->sync_queue_delay = 0;
+		ipvs->master_stopped = 0;
+		sema_init(&ipvs->master_sem, 0);
 		sock = make_send_sock(net);
 	} else if (state == IP_VS_STATE_BACKUP) {
 		if (ipvs->backup_thread)
@@ -1623,6 +1691,8 @@  int stop_sync_thread(struct net *net, int state)
 		spin_lock_bh(&ipvs->sync_lock);
 		ipvs->sync_state &= ~IP_VS_STATE_MASTER;
 		spin_unlock_bh(&ipvs->sync_lock);
+		ipvs->master_stopped = 1;
+		up(&ipvs->master_sem);
 		retc = kthread_stop(ipvs->master_thread);
 		ipvs->master_thread = NULL;
 	} else if (state == IP_VS_STATE_BACKUP) {