[RFC] ext4: fix potential race between online resizing and write operations
diff mbox series

Message ID 20200215233817.GA670792@mit.edu
State Superseded
Headers show
Series
  • [RFC] ext4: fix potential race between online resizing and write operations
Related show

Commit Message

Theodore Y. Ts'o Feb. 15, 2020, 11:38 p.m. UTC
This is a revision of a proposed patch[1] to fix a bug[2] to fix a
reported crash caused by the fact that we are growing an array, it's
possible for another process to try to dereference that array, get the
old copy of the array, and then before it fetch an element of the
array and use it, it could get reused for something else.

[1] https://bugzilla.kernel.org/attachment.cgi?id=287189
[2] https://bugzilla.kernel.org/show_bug.cgi?id=206443

So this is a pretty classical case of RCU, and in the original version
of the patch[1], it used synchronize_rcu_expedited() followed by a
call kvfree().  If you read the RCU documentation it states that you
really shouldn't call synchronize_rcu() and kfree() in a loop, and
while synchronize_rcu_expedited() does speed things up, it does so by
impacting the performance of all the other CPU's.

And unfortunately add_new_gdb() get's called in a loop.  If you expand
a file system by say, 1TB, add_new_gdb() and/or add_new_gdb_meta_gb()
will get called 8,192 times.

To fix this, I added ext4_kvfree_array_rcu() which allocates an object
containing a void *ptr and the rcu_head, and then uses call_rcu() to
free the pointer and the stub object.  I'm cc'ing Paul because I'm a
bit surprised no one else has needed something like this before; so
I'm wondering if I'm missing something.  If not, would it make sense
to make something like kvfree_array_rcu as a more general facility?

   		       			   - Ted

From 5ab7e4d38318c125246a4aa899dd614a37c803ef Mon Sep 17 00:00:00 2001
From: Theodore Ts'o <tytso@mit.edu>
Date: Sat, 15 Feb 2020 16:40:37 -0500
Subject: [PATCH] ext4: fix potential race between online resizing and write operations

During an online resize an array of pointers to buffer heads gets
replaced so it can get enlarged.  If there is a racing block
allocation or deallocation which uses the old array, and the old array
has gotten reused this can lead to a GPF or some other random kernel
memory getting modified.

Link: https://bugzilla.kernel.org/show_bug.cgi?id=206443
Reported-by: Suraj Jitindar Singh <surajjs@amazon.com>
Signed-off-by: Theodore Ts'o <tytso@mit.edu>
Cc: stable@kernel.org
---
 fs/ext4/resize.c | 35 +++++++++++++++++++++++++++++++----
 fs/ext4/balloc.c | 15 ++++++++++++---
 fs/ext4/ext4.h   |  1 +
 3 files changed, 44 insertions(+), 7 deletions(-)

2.24.1

Comments

Paul E. McKenney Feb. 16, 2020, 12:12 p.m. UTC | #1
On Sat, Feb 15, 2020 at 06:38:17PM -0500, Theodore Y. Ts'o wrote:
> This is a revision of a proposed patch[1] to fix a bug[2] to fix a
> reported crash caused by the fact that we are growing an array, it's
> possible for another process to try to dereference that array, get the
> old copy of the array, and then before it fetch an element of the
> array and use it, it could get reused for something else.
> 
> [1] https://bugzilla.kernel.org/attachment.cgi?id=287189
> [2] https://bugzilla.kernel.org/show_bug.cgi?id=206443
> 
> So this is a pretty classical case of RCU, and in the original version
> of the patch[1], it used synchronize_rcu_expedited() followed by a
> call kvfree().  If you read the RCU documentation it states that you
> really shouldn't call synchronize_rcu() and kfree() in a loop, and
> while synchronize_rcu_expedited() does speed things up, it does so by
> impacting the performance of all the other CPU's.
> 
> And unfortunately add_new_gdb() get's called in a loop.  If you expand
> a file system by say, 1TB, add_new_gdb() and/or add_new_gdb_meta_gb()
> will get called 8,192 times.
> 
> To fix this, I added ext4_kvfree_array_rcu() which allocates an object
> containing a void *ptr and the rcu_head, and then uses call_rcu() to
> free the pointer and the stub object.  I'm cc'ing Paul because I'm a
> bit surprised no one else has needed something like this before; so
> I'm wondering if I'm missing something.  If not, would it make sense
> to make something like kvfree_array_rcu as a more general facility?

Good point!

Now that kfree_rcu() is on its way to being less of a hack deeply
entangled into the bowels of RCU, this might be fairly easy to implement.
It might well be simply a matter of a function pointer and a kvfree_rcu()
API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.

							Thanx, Paul

> >From 5ab7e4d38318c125246a4aa899dd614a37c803ef Mon Sep 17 00:00:00 2001
> From: Theodore Ts'o <tytso@mit.edu>
> Date: Sat, 15 Feb 2020 16:40:37 -0500
> Subject: [PATCH] ext4: fix potential race between online resizing and write operations
> 
> During an online resize an array of pointers to buffer heads gets
> replaced so it can get enlarged.  If there is a racing block
> allocation or deallocation which uses the old array, and the old array
> has gotten reused this can lead to a GPF or some other random kernel
> memory getting modified.
> 
> Link: https://bugzilla.kernel.org/show_bug.cgi?id=206443
> Reported-by: Suraj Jitindar Singh <surajjs@amazon.com>
> Signed-off-by: Theodore Ts'o <tytso@mit.edu>
> Cc: stable@kernel.org
> ---
>  fs/ext4/resize.c | 35 +++++++++++++++++++++++++++++++----
>  fs/ext4/balloc.c | 15 ++++++++++++---
>  fs/ext4/ext4.h   |  1 +
>  3 files changed, 44 insertions(+), 7 deletions(-)
> 
> diff --git a/fs/ext4/resize.c b/fs/ext4/resize.c
> index 86a2500ed292..98d3b4ec3422 100644
> --- a/fs/ext4/resize.c
> +++ b/fs/ext4/resize.c
> @@ -17,6 +17,33 @@
>  
>  #include "ext4_jbd2.h"
>  
> +struct ext4_rcu_ptr {
> +	struct rcu_head rcu;
> +	void *ptr;
> +};
> +
> +static void ext4_rcu_ptr_callback(struct rcu_head *head)
> +{
> +	struct ext4_rcu_ptr *ptr;
> +
> +	ptr = container_of(head, struct ext4_rcu_ptr, rcu);
> +	kvfree(ptr->ptr);
> +	kfree(ptr);
> +}
> +
> +void ext4_kvfree_array_rcu(void *to_free)
> +{
> +	struct ext4_rcu_ptr *ptr = kzalloc(sizeof(*ptr), GFP_KERNEL);
> +
> +	if (ptr) {
> +		ptr->ptr = to_free;
> +		call_rcu(&ptr->rcu, ext4_rcu_ptr_callback);
> +		return;
> +	}
> +	synchronize_rcu();
> +	kvfree(ptr);
> +}
> +
>  int ext4_resize_begin(struct super_block *sb)
>  {
>  	struct ext4_sb_info *sbi = EXT4_SB(sb);
> @@ -864,9 +891,9 @@ static int add_new_gdb(handle_t *handle, struct inode *inode,
>  	memcpy(n_group_desc, o_group_desc,
>  	       EXT4_SB(sb)->s_gdb_count * sizeof(struct buffer_head *));
>  	n_group_desc[gdb_num] = gdb_bh;
> -	EXT4_SB(sb)->s_group_desc = n_group_desc;
> +	rcu_assign_pointer(EXT4_SB(sb)->s_group_desc, n_group_desc);
>  	EXT4_SB(sb)->s_gdb_count++;
> -	kvfree(o_group_desc);
> +	ext4_kvfree_array_rcu(o_group_desc);
>  
>  	le16_add_cpu(&es->s_reserved_gdt_blocks, -1);
>  	err = ext4_handle_dirty_super(handle, sb);
> @@ -922,9 +949,9 @@ static int add_new_gdb_meta_bg(struct super_block *sb,
>  		return err;
>  	}
>  
> -	EXT4_SB(sb)->s_group_desc = n_group_desc;
> +	rcu_assign_pointer(EXT4_SB(sb)->s_group_desc, n_group_desc);
>  	EXT4_SB(sb)->s_gdb_count++;
> -	kvfree(o_group_desc);
> +	ext4_kvfree_array_rcu(o_group_desc);
>  	return err;
>  }
>  
> diff --git a/fs/ext4/balloc.c b/fs/ext4/balloc.c
> index 5f993a411251..5368bf67300b 100644
> --- a/fs/ext4/balloc.c
> +++ b/fs/ext4/balloc.c
> @@ -270,6 +270,7 @@ struct ext4_group_desc * ext4_get_group_desc(struct super_block *sb,
>  	ext4_group_t ngroups = ext4_get_groups_count(sb);
>  	struct ext4_group_desc *desc;
>  	struct ext4_sb_info *sbi = EXT4_SB(sb);
> +	struct buffer_head *bh_p;
>  
>  	if (block_group >= ngroups) {
>  		ext4_error(sb, "block_group >= groups_count - block_group = %u,"
> @@ -280,7 +281,15 @@ struct ext4_group_desc * ext4_get_group_desc(struct super_block *sb,
>  
>  	group_desc = block_group >> EXT4_DESC_PER_BLOCK_BITS(sb);
>  	offset = block_group & (EXT4_DESC_PER_BLOCK(sb) - 1);
> -	if (!sbi->s_group_desc[group_desc]) {
> +	rcu_read_lock();
> +	bh_p = rcu_dereference(sbi->s_group_desc)[group_desc];
> +	/*
> +	 * We can unlock here since the pointer being dereferenced won't be
> +	 * dereferenced again. By looking at the usage in add_new_gdb() the
> +	 * value isn't modified, just the pointer, and so it remains valid.
> +	 */
> +	rcu_read_unlock();
> +	if (!bh_p) {
>  		ext4_error(sb, "Group descriptor not loaded - "
>  			   "block_group = %u, group_desc = %u, desc = %u",
>  			   block_group, group_desc, offset);
> @@ -288,10 +297,10 @@ struct ext4_group_desc * ext4_get_group_desc(struct super_block *sb,
>  	}
>  
>  	desc = (struct ext4_group_desc *)(
> -		(__u8 *)sbi->s_group_desc[group_desc]->b_data +
> +		(__u8 *)bh_p->b_data +
>  		offset * EXT4_DESC_SIZE(sb));
>  	if (bh)
> -		*bh = sbi->s_group_desc[group_desc];
> +		*bh = bh_p;
>  	return desc;
>  }
>  
> diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
> index 4441331d06cc..b7824d56b968 100644
> --- a/fs/ext4/ext4.h
> +++ b/fs/ext4/ext4.h
> @@ -2730,8 +2730,8 @@ extern int ext4_generic_delete_entry(handle_t *handle,
>  extern bool ext4_empty_dir(struct inode *inode);
>  
>  /* resize.c */
> +extern void ext4_kvfree_array_rcu(void *to_free);
>  extern int ext4_group_add(struct super_block *sb,
>  				struct ext4_new_group_data *input);
>  extern int ext4_group_extend(struct super_block *sb,
> 
> -- 
> 2.24.1
>
Theodore Y. Ts'o Feb. 16, 2020, 8:32 p.m. UTC | #2
On Sun, Feb 16, 2020 at 04:12:46AM -0800, Paul E. McKenney wrote:
> > +void ext4_kvfree_array_rcu(void *to_free)
> > +{
> > +	struct ext4_rcu_ptr *ptr = kzalloc(sizeof(*ptr), GFP_KERNEL);
> > +
> > +	if (ptr) {
> > +		ptr->ptr = to_free;
> > +		call_rcu(&ptr->rcu, ext4_rcu_ptr_callback);
> > +		return;
> > +	}
> > +	synchronize_rcu();
> > +	kvfree(ptr);
> > +}

Whoops, that last statement should be kvfree(to_free), of course.
I'll fix that up in my tree.

		     	      	       - Ted
Uladzislau Rezki Feb. 17, 2020, 4:08 p.m. UTC | #3
Hello, Joel, Paul, Ted. 

> 
> Good point!
> 
> Now that kfree_rcu() is on its way to being less of a hack deeply
> entangled into the bowels of RCU, this might be fairly easy to implement.
> It might well be simply a matter of a function pointer and a kvfree_rcu()
> API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> 
I think it makes sense. For example i see there is a similar demand in
the mm/list_lru.c too. As for implementation, it will not be hard, i think. 

The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
logic(probably will need to rename that function), i.e. to free vmalloc() allocations
only in "emergency path" by just calling kvfree(). So that function in its turn will
figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.

Just an example:

<snip>
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 75a2eded7aa2..0c4af5d0e3f8 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -806,6 +806,11 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
                kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
        } while (0)

+#define __kvfree_rcu(head, offset) \
+       do { \
+               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
+       } while (0)
+
 /**
  * kfree_rcu() - kfree an object after a grace period.
  * @ptr:       pointer to kfree
@@ -840,6 +845,14 @@ do {                                                                       \
                __kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
 } while (0)

+#define kvfree_rcu_my(ptr, rhf)                                                \
+do {                                                                   \
+       typeof (ptr) ___p = (ptr);                                      \
+                                                                       \
+       if (___p)                                                       \
+               __kvfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
+} while (0)
+
 /*
  * Place this after a lock-acquisition primitive to guarantee that
  * an UNLOCK+LOCK pair acts as a full barrier.  This guarantee applies
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 394a83bd7ff4..1a05bb44951b 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2783,11 +2783,10 @@ static void kfree_rcu_work(struct work_struct *work)
                rcu_lock_acquire(&rcu_callback_map);
                trace_rcu_invoke_kfree_callback(rcu_state.name, head, offset);
 
-               if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset)))
-                       kfree((void *)head - offset);
+               kvfree((void *)head - offset);
 
-                rcu_lock_release(&rcu_callback_map);
-                cond_resched_tasks_rcu_qs();
+               rcu_lock_release(&rcu_callback_map);
+               cond_resched_tasks_rcu_qs();
        }
 }
 
@@ -2964,7 +2963,8 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
         * Under high memory pressure GFP_NOWAIT can fail,
         * in that case the emergency path is maintained.
         */
-       if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
+       if (is_vmalloc_addr((void *) head - (unsigned long) func) ||
+                       unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
                head->func = func;
                head->next = krcp->head;
                krcp->head = head;
<snip>

How to use it:

<snip>
struct test_kvfree_rcu {
       unsigned char array[PAGE_SIZE * 2];
       struct rcu_head rcu;
};

struct test_kvfree_rcu *p;

p = vmalloc(sizeof(struct test_kvfree_rcu));
kvfree_rcu_my((struct test_kvfree_rcu *) p, rcu);
<snip>

Any thoughts?

Thank you!

--
Vlad Rezki
Theodore Y. Ts'o Feb. 17, 2020, 7:33 p.m. UTC | #4
On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote:
> Hello, Joel, Paul, Ted. 
> 
> > 
> > Good point!
> > 
> > Now that kfree_rcu() is on its way to being less of a hack deeply
> > entangled into the bowels of RCU, this might be fairly easy to implement.
> > It might well be simply a matter of a function pointer and a kvfree_rcu()
> > API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> > 
> I think it makes sense. For example i see there is a similar demand in
> the mm/list_lru.c too. As for implementation, it will not be hard, i think. 
> 
> The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
> logic(probably will need to rename that function), i.e. to free vmalloc() allocations
> only in "emergency path" by just calling kvfree(). So that function in its turn will
> figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.

The other difference between ext4_kvfree_array_rcu() and kfree_rcu()
is that kfree_rcu() is a magic macro which frees a structure, which
has to contain a struct rcu_head.  In this case, I'm freeing a pointer
to set of structures, or in another case, a set of buffer_heads, which
do *not* have an rcu_head structure.

> struct test_kvfree_rcu {
>        unsigned char array[PAGE_SIZE * 2];
>        struct rcu_head rcu;
> };

I suspect I'd still want to use the ext4_kfree_array_rcu(), for a
couple of reasons.  First of all, the array is variably sized.  So we
don't know how big it is.  That could be fixed via something like 

struct test_kvfree_rcu {
       struct rcu_head rcu;
       struct test_s [];
};

... but the other issue is that we have code where we have arrays of
arrays, e.g.:

	struct ext4_group_info ***s_group_info;

which is an array of array of pointers to ext4_group_info.  Trying to
cram in the rcu_head makes the code more complicated --- and also,
resizing file systems is something that happens often, and I don't
want to optimize it by keeping rcu_head structs around all the time.

This is why at least for *this* use case, it's actually better to
allocate temp array just before callig call_rcu(), and if I can't
allocate it due to memory pressure, we'll it's OK to use
synchronize_rcu() followed by kvfree().

Cheers,

						- Ted
Uladzislau Rezki Feb. 18, 2020, 5:08 p.m. UTC | #5
On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote:
> On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote:
> > Hello, Joel, Paul, Ted. 
> > 
> > > 
> > > Good point!
> > > 
> > > Now that kfree_rcu() is on its way to being less of a hack deeply
> > > entangled into the bowels of RCU, this might be fairly easy to implement.
> > > It might well be simply a matter of a function pointer and a kvfree_rcu()
> > > API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> > > 
> > I think it makes sense. For example i see there is a similar demand in
> > the mm/list_lru.c too. As for implementation, it will not be hard, i think. 
> > 
> > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
> > logic(probably will need to rename that function), i.e. to free vmalloc() allocations
> > only in "emergency path" by just calling kvfree(). So that function in its turn will
> > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.
> 
> The other difference between ext4_kvfree_array_rcu() and kfree_rcu()
> is that kfree_rcu() is a magic macro which frees a structure, which
> has to contain a struct rcu_head.  In this case, I'm freeing a pointer
> to set of structures, or in another case, a set of buffer_heads, which
> do *not* have an rcu_head structure.
> 
We can implement kvfree_rcu() that will deal with pointer only, it is not
an issue. I mean without embedding rcu_head to the structure or whatever
else.

I tried to implement it with less number of changes to make it more clear
and readable. Please have a look:

<snip>
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 2678a37c3169..9e75c15b53f9 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
 #define __kfree_rcu(head, offset) \
        do { \  
                BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \
-               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
+               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \
        } while (0)

 /**
@@ -842,6 +842,14 @@ do {                                                                       \
                __kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
 } while (0)

+#define kvfree_rcu(ptr)                                                \
+do {                                                                   \
+       typeof (ptr) ___p = (ptr);                                      \
+                                                                       \
+       if (___p)                                                       \
+               kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \
+} while (0)
+
 /*
  * Place this after a lock-acquisition primitive to guarantee that
  * an UNLOCK+LOCK pair acts as a full barrier.  This guarantee applies
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 045c28b71f4f..a12ecc1645fa 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
        synchronize_rcu();
 }

-static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
 {
        call_rcu(head, func);
 }
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 45f3f66bb04d..1e445b566c01 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
 }
 
 void synchronize_rcu_expedited(void);
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr);
 
 void rcu_barrier(void);
 bool rcu_eqs_special_set(int cpu);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 4a885af2ff73..5f22f369cb98 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2746,6 +2746,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
  * kfree_rcu_bulk_data structure becomes exactly one page.
  */
 #define KFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 3)
+#define KVFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 2)
 
 /**
  * struct kfree_rcu_bulk_data - single block to store kfree_rcu() pointers
@@ -2761,6 +2762,12 @@ struct kfree_rcu_bulk_data {
        struct rcu_head *head_free_debug;
 };
+struct kvfree_rcu_bulk_data {
+       unsigned long nr_records;
+       void *records[KVFREE_BULK_MAX_ENTR];
+       struct kvfree_rcu_bulk_data *next;
+};
+
 /**
  * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
  * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
@@ -2773,6 +2780,7 @@ struct kfree_rcu_cpu_work {
        struct rcu_work rcu_work;
        struct rcu_head *head_free;
        struct kfree_rcu_bulk_data *bhead_free;
+       struct kvfree_rcu_bulk_data *bvhead_free;
        struct kfree_rcu_cpu *krcp;
 };
 
@@ -2796,6 +2804,10 @@ struct kfree_rcu_cpu {
        struct rcu_head *head;
        struct kfree_rcu_bulk_data *bhead;
        struct kfree_rcu_bulk_data *bcached;
+       struct kvfree_rcu_bulk_data *bvhead;
+       struct kvfree_rcu_bulk_data *bvcached;
+
+       /* rename to "free_rcu_cpu_work" */
        struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
        spinlock_t lock;
        struct delayed_work monitor_work;
@@ -2823,20 +2835,30 @@ static void kfree_rcu_work(struct work_struct *work)
        unsigned long flags;
        struct rcu_head *head, *next;
        struct kfree_rcu_bulk_data *bhead, *bnext;
+       struct kvfree_rcu_bulk_data *bvhead, *bvnext;
        struct kfree_rcu_cpu *krcp;
        struct kfree_rcu_cpu_work *krwp;
+       int i;
        krwp = container_of(to_rcu_work(work),
                            struct kfree_rcu_cpu_work, rcu_work);
+
        krcp = krwp->krcp;
        spin_lock_irqsave(&krcp->lock, flags);
+       /* Channel 1. */
        head = krwp->head_free;
        krwp->head_free = NULL;
+
+       /* Channel 2. */
        bhead = krwp->bhead_free;
        krwp->bhead_free = NULL;
+
+       /* Channel 3. */
+       bvhead = krwp->bvhead_free;
+       krwp->bvhead_free = NULL;
        spin_unlock_irqrestore(&krcp->lock, flags);
 
-       /* "bhead" is now private, so traverse locklessly. */
+       /* kmalloc()/kfree_rcu() channel. */
        for (; bhead; bhead = bnext) {
                bnext = bhead->next;
 
@@ -2855,6 +2877,21 @@ static void kfree_rcu_work(struct work_struct *work)
                cond_resched_tasks_rcu_qs();
        }
 
+       /* kvmalloc()/kvfree_rcu() channel. */
+       for (; bvhead; bvhead = bvnext) {
+               bvnext = bvhead->next;
+
+               rcu_lock_acquire(&rcu_callback_map);
+               for (i = 0; i < bvhead->nr_records; i++)
+                       kvfree(bvhead->records[i]);
+               rcu_lock_release(&rcu_callback_map);
+
+               if (cmpxchg(&krcp->bvcached, NULL, bvhead))
+                       free_page((unsigned long) bvhead);
+
+               cond_resched_tasks_rcu_qs();
+       }
+
        /*
         * Emergency case only. It can happen under low memory
         * condition when an allocation gets failed, so the "bulk"
@@ -2901,7 +2938,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
                 * return false to tell caller to retry.
                 */
                if ((krcp->bhead && !krwp->bhead_free) ||
-                               (krcp->head && !krwp->head_free)) {
+                               (krcp->head && !krwp->head_free) ||
+                               (krcp->bvhead && !krwp->bvhead_free)) {
                        /* Channel 1. */
                        if (!krwp->bhead_free) {
                                krwp->bhead_free = krcp->bhead;
@@ -2914,11 +2952,17 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
                                krcp->head = NULL;
                        }
 
+                       /* Channel 3. */
+                       if (!krwp->bvhead_free) {
+                               krwp->bvhead_free = krcp->bvhead;
+                               krcp->bvhead = NULL;
+                       }
+
                        /*
-                        * One work is per one batch, so there are two "free channels",
-                        * "bhead_free" and "head_free" the batch can handle. It can be
-                        * that the work is in the pending state when two channels have
-                        * been detached following each other, one by one.
+                        * One work is per one batch, so there are three "free channels",
+                        * "bhead_free", "head_free" and "bvhead_free" the batch can handle.
+                        * It can be that the work is in the pending state when two channels
+                        * have been detached following each other, one by one.
                         */
                        queue_rcu_work(system_wq, &krwp->rcu_work);
                        queued = true;
@@ -3010,6 +3054,42 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
        return true;
 }
 
+static inline bool
+kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
+{
+       struct kvfree_rcu_bulk_data *bnode;
+
+       if (unlikely(!krcp->initialized))
+               return false;
+
+       lockdep_assert_held(&krcp->lock);
+
+       if (!krcp->bvhead ||
+                       krcp->bvhead->nr_records == KVFREE_BULK_MAX_ENTR) {
+               bnode = xchg(&krcp->bvcached, NULL);
+               if (!bnode) {
+                       WARN_ON_ONCE(sizeof(struct kvfree_rcu_bulk_data) > PAGE_SIZE);
+
+                       bnode = (struct kvfree_rcu_bulk_data *)
+                               __get_free_page(GFP_NOWAIT | __GFP_NOWARN);
+               }
+
+               if (unlikely(!bnode))
+                       return false;
+
+               /* Initialize the new block. */
+               bnode->nr_records = 0;
+               bnode->next = krcp->bvhead;
+
+               /* Attach it to the bvhead. */
+               krcp->bvhead = bnode;
+       }
+
+       /* Finally insert. */
+       krcp->bvhead->records[krcp->bvhead->nr_records++] = ptr;
+       return true;
+}
+
 /*
  * Queue a request for lazy invocation of kfree_bulk()/kfree() after a grace
  * period. Please note there are two paths are maintained, one is the main one
@@ -3022,32 +3102,39 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
  * be free'd in workqueue context. This allows us to: batch requests together to
  * reduce the number of grace periods during heavy kfree_rcu() load.
  */
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr_to_free)
 {
        unsigned long flags;
        struct kfree_rcu_cpu *krcp;
+       bool skip_call_rcu = true;
 
        local_irq_save(flags);  // For safely calling this_cpu_ptr().
        krcp = this_cpu_ptr(&krc);
        if (krcp->initialized)
                spin_lock(&krcp->lock);
 
-       // Queue the object but don't yet schedule the batch.
-       if (debug_rcu_head_queue(head)) {
-               // Probable double kfree_rcu(), just leak.
-               WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
-                         __func__, head);
-               goto unlock_return;
-       }
+       if (ptr_to_free) {
+               skip_call_rcu = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr_to_free);
+               if (!skip_call_rcu)
+                       goto unlock_return;
+       } else {
+               // Queue the object but don't yet schedule the batch.
+               if (debug_rcu_head_queue(head)) {
+                       // Probable double kfree_rcu(), just leak.
+                       WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
+                               __func__, head);
+                       goto unlock_return;
+               }
 
-       /*
-        * Under high memory pressure GFP_NOWAIT can fail,
-        * in that case the emergency path is maintained.
-        */
-       if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
-               head->func = func;
-               head->next = krcp->head;
-               krcp->head = head;
+               /*
+                * Under high memory pressure GFP_NOWAIT can fail,
+                * in that case the emergency path is maintained.
+                */
+               if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
+                       head->func = func;
+                       head->next = krcp->head;
+                       krcp->head = head;
+               }
        }
 
        // Set timer to drain after KFREE_DRAIN_JIFFIES.
@@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
        if (krcp->initialized)
                spin_unlock(&krcp->lock);
        local_irq_restore(flags);
+
+       if (!skip_call_rcu) {
+               synchronize_rcu();
+               kvfree(ptr_to_free);
+       }
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);

diff --git a/mm/list_lru.c b/mm/list_lru.c
index 0f1f6b06b7f3..0efb849b4f1f 100644
--- a/mm/list_lru.c
+++ b/mm/list_lru.c
@@ -390,14 +390,6 @@ static void memcg_destroy_list_lru_node(struct list_lru_node *nlru)
        kvfree(memcg_lrus);
 }

-static void kvfree_rcu(struct rcu_head *head)
-{
-       struct list_lru_memcg *mlru;
-
-       mlru = container_of(head, struct list_lru_memcg, rcu);
-       kvfree(mlru);
-}
-
 static int memcg_update_list_lru_node(struct list_lru_node *nlru,
                                      int old_size, int new_size)
 {
@@ -429,7 +421,7 @@ static int memcg_update_list_lru_node(struct list_lru_node *nlru,
        rcu_assign_pointer(nlru->memcg_lrus, new);
        spin_unlock_irq(&nlru->lock);

-       call_rcu(&old->rcu, kvfree_rcu);
+       kvfree_rcu(old);
        return 0;
 }
<snip>

now it becomes possible to use it like: 
	...
	void *p = kvmalloc(PAGE_SIZE);
	kvfree_rcu(p);
	...
also have a look at the example in the mm/list_lru.c diff.

I can send out the RFC/PATCH that implements kvfree_rcu() API without need
of "rcu_head" structure. 

Paul, Joel what are your thoughts?

Thank you in advance!

--
Vlad Rezki
Jitindar SIngh, Suraj Feb. 19, 2020, 3:09 a.m. UTC | #6
On Sat, 2020-02-15 at 18:38 -0500, Theodore Y. Ts'o wrote:
> This is a revision of a proposed patch[1] to fix a bug[2] to fix a
> reported crash caused by the fact that we are growing an array, it's
> possible for another process to try to dereference that array, get
> the
> old copy of the array, and then before it fetch an element of the
> array and use it, it could get reused for something else.
> 
> [1] https://bugzilla.kernel.org/attachment.cgi?id=287189
> [2] https://bugzilla.kernel.org/show_bug.cgi?id=206443
> 
> So this is a pretty classical case of RCU, and in the original
> version
> of the patch[1], it used synchronize_rcu_expedited() followed by a
> call kvfree().  If you read the RCU documentation it states that you
> really shouldn't call synchronize_rcu() and kfree() in a loop, and
> while synchronize_rcu_expedited() does speed things up, it does so by
> impacting the performance of all the other CPU's.
> 
> And unfortunately add_new_gdb() get's called in a loop.  If you
> expand
> a file system by say, 1TB, add_new_gdb() and/or add_new_gdb_meta_gb()
> will get called 8,192 times.
> 
> To fix this, I added ext4_kvfree_array_rcu() which allocates an
> object
> containing a void *ptr and the rcu_head, and then uses call_rcu() to
> free the pointer and the stub object.  I'm cc'ing Paul because I'm a
> bit surprised no one else has needed something like this before; so
> I'm wondering if I'm missing something.  If not, would it make sense
> to make something like kvfree_array_rcu as a more general facility?
> 
>    		       			   - Ted
> 
> From 5ab7e4d38318c125246a4aa899dd614a37c803ef Mon Sep 17 00:00:00
> 2001
> From: Theodore Ts'o <tytso@mit.edu>
> Date: Sat, 15 Feb 2020 16:40:37 -0500
> Subject: [PATCH] ext4: fix potential race between online resizing and
> write operations
> 
> During an online resize an array of pointers to buffer heads gets
> replaced so it can get enlarged.  If there is a racing block
> allocation or deallocation which uses the old array, and the old
> array
> has gotten reused this can lead to a GPF or some other random kernel
> memory getting modified.
> 
> Link: https://bugzilla.kernel.org/show_bug.cgi?id=206443
> Reported-by: Suraj Jitindar Singh <surajjs@amazon.com>
> Signed-off-by: Theodore Ts'o <tytso@mit.edu>

One comment below where I think you free the wrong object.

With that fixed up:
Tested-by: Suraj Jitindar Singh <surajjs@amazon.com>

> Cc: stable@kernel.org
> ---
>  fs/ext4/resize.c | 35 +++++++++++++++++++++++++++++++----
>  fs/ext4/balloc.c | 15 ++++++++++++---
>  fs/ext4/ext4.h   |  1 +
>  3 files changed, 44 insertions(+), 7 deletions(-)
> 
> diff --git a/fs/ext4/resize.c b/fs/ext4/resize.c
> index 86a2500ed292..98d3b4ec3422 100644
> --- a/fs/ext4/resize.c
> +++ b/fs/ext4/resize.c
> @@ -17,6 +17,33 @@
>  
>  #include "ext4_jbd2.h"
>  
> +struct ext4_rcu_ptr {
> +	struct rcu_head rcu;
> +	void *ptr;
> +};
> +
> +static void ext4_rcu_ptr_callback(struct rcu_head *head)
> +{
> +	struct ext4_rcu_ptr *ptr;
> +
> +	ptr = container_of(head, struct ext4_rcu_ptr, rcu);
> +	kvfree(ptr->ptr);
> +	kfree(ptr);
> +}
> +
> +void ext4_kvfree_array_rcu(void *to_free)
> +{
> +	struct ext4_rcu_ptr *ptr = kzalloc(sizeof(*ptr), GFP_KERNEL);
> +
> +	if (ptr) {
> +		ptr->ptr = to_free;
> +		call_rcu(&ptr->rcu, ext4_rcu_ptr_callback);
> +		return;
> +	}
> +	synchronize_rcu();

The below needs to be:
kvfree(to_free);

> +	kvfree(ptr);
> +}
> +
>  int ext4_resize_begin(struct super_block *sb)
>  {
>  	struct ext4_sb_info *sbi = EXT4_SB(sb);
> @@ -864,9 +891,9 @@ static int add_new_gdb(handle_t *handle, struct
> inode *inode,
>  	memcpy(n_group_desc, o_group_desc,
>  	       EXT4_SB(sb)->s_gdb_count * sizeof(struct buffer_head
> *));
>  	n_group_desc[gdb_num] = gdb_bh;
> -	EXT4_SB(sb)->s_group_desc = n_group_desc;
> +	rcu_assign_pointer(EXT4_SB(sb)->s_group_desc, n_group_desc);
>  	EXT4_SB(sb)->s_gdb_count++;
> -	kvfree(o_group_desc);
> +	ext4_kvfree_array_rcu(o_group_desc);
>  
>  	le16_add_cpu(&es->s_reserved_gdt_blocks, -1);
>  	err = ext4_handle_dirty_super(handle, sb);
> @@ -922,9 +949,9 @@ static int add_new_gdb_meta_bg(struct super_block
> *sb,
>  		return err;
>  	}
>  
> -	EXT4_SB(sb)->s_group_desc = n_group_desc;
> +	rcu_assign_pointer(EXT4_SB(sb)->s_group_desc, n_group_desc);
>  	EXT4_SB(sb)->s_gdb_count++;
> -	kvfree(o_group_desc);
> +	ext4_kvfree_array_rcu(o_group_desc);
>  	return err;
>  }
>  
> diff --git a/fs/ext4/balloc.c b/fs/ext4/balloc.c
> index 5f993a411251..5368bf67300b 100644
> --- a/fs/ext4/balloc.c
> +++ b/fs/ext4/balloc.c
> @@ -270,6 +270,7 @@ struct ext4_group_desc *
> ext4_get_group_desc(struct super_block *sb,
>  	ext4_group_t ngroups = ext4_get_groups_count(sb);
>  	struct ext4_group_desc *desc;
>  	struct ext4_sb_info *sbi = EXT4_SB(sb);
> +	struct buffer_head *bh_p;
>  
>  	if (block_group >= ngroups) {
>  		ext4_error(sb, "block_group >= groups_count -
> block_group = %u,"
> @@ -280,7 +281,15 @@ struct ext4_group_desc *
> ext4_get_group_desc(struct super_block *sb,
>  
>  	group_desc = block_group >> EXT4_DESC_PER_BLOCK_BITS(sb);
>  	offset = block_group & (EXT4_DESC_PER_BLOCK(sb) - 1);
> -	if (!sbi->s_group_desc[group_desc]) {
> +	rcu_read_lock();
> +	bh_p = rcu_dereference(sbi->s_group_desc)[group_desc];
> +	/*
> +	 * We can unlock here since the pointer being dereferenced
> won't be
> +	 * dereferenced again. By looking at the usage in add_new_gdb()
> the
> +	 * value isn't modified, just the pointer, and so it remains
> valid.
> +	 */
> +	rcu_read_unlock();
> +	if (!bh_p) {
>  		ext4_error(sb, "Group descriptor not loaded - "
>  			   "block_group = %u, group_desc = %u, desc =
> %u",
>  			   block_group, group_desc, offset);
> @@ -288,10 +297,10 @@ struct ext4_group_desc *
> ext4_get_group_desc(struct super_block *sb,
>  	}
>  
>  	desc = (struct ext4_group_desc *)(
> -		(__u8 *)sbi->s_group_desc[group_desc]->b_data +
> +		(__u8 *)bh_p->b_data +
>  		offset * EXT4_DESC_SIZE(sb));
>  	if (bh)
> -		*bh = sbi->s_group_desc[group_desc];
> +		*bh = bh_p;
>  	return desc;
>  }
>  
> diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
> index 4441331d06cc..b7824d56b968 100644
> --- a/fs/ext4/ext4.h
> +++ b/fs/ext4/ext4.h
> @@ -2730,8 +2730,8 @@ extern int ext4_generic_delete_entry(handle_t
> *handle,
>  extern bool ext4_empty_dir(struct inode *inode);
>  
>  /* resize.c */
> +extern void ext4_kvfree_array_rcu(void *to_free);
>  extern int ext4_group_add(struct super_block *sb,
>  				struct ext4_new_group_data *input);
>  extern int ext4_group_extend(struct super_block *sb,
>
Theodore Y. Ts'o Feb. 20, 2020, 4:34 a.m. UTC | #7
On Wed, Feb 19, 2020 at 03:09:07AM +0000, Jitindar SIngh, Suraj wrote:
> 
> One comment below where I think you free the wrong object.

Yes, I had sent a self-correction about that mistake earlier in this
thread.

> With that fixed up:
> Tested-by: Suraj Jitindar Singh <surajjs@amazon.com>

Thanks for testing the patch and confirming that it fixes the problem
you found!

							- Ted
Theodore Y. Ts'o Feb. 20, 2020, 4:52 a.m. UTC | #8
On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> now it becomes possible to use it like: 
> 	...
> 	void *p = kvmalloc(PAGE_SIZE);
> 	kvfree_rcu(p);
> 	...
> also have a look at the example in the mm/list_lru.c diff.

I certainly like the interface, thanks!  I'm going to be pushing
patches to fix this using ext4_kvfree_array_rcu() since there are a
number of bugs in ext4's online resizing which appear to be hitting
multiple cloud providers (with reports from both AWS and GCP) and I
want something which can be easily backported to stable kernels.

But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
your kvfree_rcu() is definitely more efficient than my expedient
jury-rig.

I don't feel entirely competent to review the implementation, but I do
have one question.  It looks like the rcutiny implementation of
kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
Am I missing something?

> diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> index 045c28b71f4f..a12ecc1645fa 100644
> --- a/include/linux/rcutiny.h
> +++ b/include/linux/rcutiny.h
> @@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
>         synchronize_rcu();
>  }
> 
> -static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> +static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
>  {
>         call_rcu(head, func);
>  }

Thanks,

					- Ted
Paul E. McKenney Feb. 21, 2020, 12:30 a.m. UTC | #9
On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > now it becomes possible to use it like: 
> > 	...
> > 	void *p = kvmalloc(PAGE_SIZE);
> > 	kvfree_rcu(p);
> > 	...
> > also have a look at the example in the mm/list_lru.c diff.
> 
> I certainly like the interface, thanks!  I'm going to be pushing
> patches to fix this using ext4_kvfree_array_rcu() since there are a
> number of bugs in ext4's online resizing which appear to be hitting
> multiple cloud providers (with reports from both AWS and GCP) and I
> want something which can be easily backported to stable kernels.
> 
> But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> your kvfree_rcu() is definitely more efficient than my expedient
> jury-rig.
> 
> I don't feel entirely competent to review the implementation, but I do
> have one question.  It looks like the rcutiny implementation of
> kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> Am I missing something?

Good catch!  I believe that rcu_reclaim_tiny() would need to do
kvfree() instead of its current kfree().

Vlad, anything I am missing here?

							Thanx, Paul

> > diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> > index 045c28b71f4f..a12ecc1645fa 100644
> > --- a/include/linux/rcutiny.h
> > +++ b/include/linux/rcutiny.h
> > @@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
> >         synchronize_rcu();
> >  }
> > 
> > -static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > +static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
> >  {
> >         call_rcu(head, func);
> >  }
> 
> Thanks,
> 
> 					- Ted
Joel Fernandes Feb. 21, 2020, 12:06 p.m. UTC | #10
On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote:
> > On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote:
> > > Hello, Joel, Paul, Ted. 
> > > 
> > > > 
> > > > Good point!
> > > > 
> > > > Now that kfree_rcu() is on its way to being less of a hack deeply
> > > > entangled into the bowels of RCU, this might be fairly easy to implement.
> > > > It might well be simply a matter of a function pointer and a kvfree_rcu()
> > > > API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> > > > 
> > > I think it makes sense. For example i see there is a similar demand in
> > > the mm/list_lru.c too. As for implementation, it will not be hard, i think. 
> > > 
> > > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
> > > logic(probably will need to rename that function), i.e. to free vmalloc() allocations
> > > only in "emergency path" by just calling kvfree(). So that function in its turn will
> > > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.
> > 
> > The other difference between ext4_kvfree_array_rcu() and kfree_rcu()
> > is that kfree_rcu() is a magic macro which frees a structure, which
> > has to contain a struct rcu_head.  In this case, I'm freeing a pointer
> > to set of structures, or in another case, a set of buffer_heads, which
> > do *not* have an rcu_head structure.
> > 
> We can implement kvfree_rcu() that will deal with pointer only, it is not
> an issue. I mean without embedding rcu_head to the structure or whatever
> else.
> 
> I tried to implement it with less number of changes to make it more clear
> and readable. Please have a look:
> 
> <snip>
> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h

Overall this implementation is nice. You are basically avoiding allocating
rcu_head like Ted did by using the array-of-pointers technique we used for
the previous kfree_rcu() work.

One thing stands out, the path where we could not allocate a page for the new
block node:

> @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>         if (krcp->initialized)
>                 spin_unlock(&krcp->lock);
>         local_irq_restore(flags);
> +
> +       if (!skip_call_rcu) {
> +               synchronize_rcu();
> +               kvfree(ptr_to_free);

We can't block, it has to be async otherwise everything else blocks, and I
think this can also be used from interrupt handlers which would at least be
an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
itself for the 'emergeny case' and use the regular techniques.

Another thing that stands out is the code duplication, if we can make this
reuse as much as of the previous code as possible, that'd be great. I'd like
to avoid bvcached and bvhead if possible. Maybe we can store information
about the fact that this is a 'special object' in some of the lower-order
bits of the pointer. Then we can detect that it is 'special' and free it
using kvfree() during the reclaim

Nice to know there would be a few users of it. thanks!

 - Joel



> index 2678a37c3169..9e75c15b53f9 100644
> --- a/include/linux/rcupdate.h
> +++ b/include/linux/rcupdate.h
> @@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
>  #define __kfree_rcu(head, offset) \
>         do { \  
>                 BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \
> -               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
> +               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \
>         } while (0)
> 
>  /**
> @@ -842,6 +842,14 @@ do {                                                                       \
>                 __kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
>  } while (0)
> 
> +#define kvfree_rcu(ptr)                                                \
> +do {                                                                   \
> +       typeof (ptr) ___p = (ptr);                                      \
> +                                                                       \
> +       if (___p)                                                       \
> +               kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \
> +} while (0)
> +
>  /*
>   * Place this after a lock-acquisition primitive to guarantee that
>   * an UNLOCK+LOCK pair acts as a full barrier.  This guarantee applies
> diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> index 045c28b71f4f..a12ecc1645fa 100644
> --- a/include/linux/rcutiny.h
> +++ b/include/linux/rcutiny.h
> @@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
>         synchronize_rcu();
>  }
> 
> -static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> +static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
>  {
>         call_rcu(head, func);
>  }
> diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
> index 45f3f66bb04d..1e445b566c01 100644
> --- a/include/linux/rcutree.h
> +++ b/include/linux/rcutree.h
> @@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
>  }
>  
>  void synchronize_rcu_expedited(void);
> -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
> +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr);
>  
>  void rcu_barrier(void);
>  bool rcu_eqs_special_set(int cpu);
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 4a885af2ff73..5f22f369cb98 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2746,6 +2746,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
>   * kfree_rcu_bulk_data structure becomes exactly one page.
>   */
>  #define KFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 3)
> +#define KVFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 2)
>  
>  /**
>   * struct kfree_rcu_bulk_data - single block to store kfree_rcu() pointers
> @@ -2761,6 +2762,12 @@ struct kfree_rcu_bulk_data {
>         struct rcu_head *head_free_debug;
>  };
> +struct kvfree_rcu_bulk_data {
> +       unsigned long nr_records;
> +       void *records[KVFREE_BULK_MAX_ENTR];
> +       struct kvfree_rcu_bulk_data *next;
> +};
> +
>  /**
>   * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
>   * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
> @@ -2773,6 +2780,7 @@ struct kfree_rcu_cpu_work {
>         struct rcu_work rcu_work;
>         struct rcu_head *head_free;
>         struct kfree_rcu_bulk_data *bhead_free;
> +       struct kvfree_rcu_bulk_data *bvhead_free;
>         struct kfree_rcu_cpu *krcp;
>  };
>  
> @@ -2796,6 +2804,10 @@ struct kfree_rcu_cpu {
>         struct rcu_head *head;
>         struct kfree_rcu_bulk_data *bhead;
>         struct kfree_rcu_bulk_data *bcached;
> +       struct kvfree_rcu_bulk_data *bvhead;
> +       struct kvfree_rcu_bulk_data *bvcached;
> +
> +       /* rename to "free_rcu_cpu_work" */
>         struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
>         spinlock_t lock;
>         struct delayed_work monitor_work;
> @@ -2823,20 +2835,30 @@ static void kfree_rcu_work(struct work_struct *work)
>         unsigned long flags;
>         struct rcu_head *head, *next;
>         struct kfree_rcu_bulk_data *bhead, *bnext;
> +       struct kvfree_rcu_bulk_data *bvhead, *bvnext;
>         struct kfree_rcu_cpu *krcp;
>         struct kfree_rcu_cpu_work *krwp;
> +       int i;
>         krwp = container_of(to_rcu_work(work),
>                             struct kfree_rcu_cpu_work, rcu_work);
> +
>         krcp = krwp->krcp;
>         spin_lock_irqsave(&krcp->lock, flags);
> +       /* Channel 1. */
>         head = krwp->head_free;
>         krwp->head_free = NULL;
> +
> +       /* Channel 2. */
>         bhead = krwp->bhead_free;
>         krwp->bhead_free = NULL;
> +
> +       /* Channel 3. */
> +       bvhead = krwp->bvhead_free;
> +       krwp->bvhead_free = NULL;
>         spin_unlock_irqrestore(&krcp->lock, flags);
>  
> -       /* "bhead" is now private, so traverse locklessly. */
> +       /* kmalloc()/kfree_rcu() channel. */
>         for (; bhead; bhead = bnext) {
>                 bnext = bhead->next;
>  
> @@ -2855,6 +2877,21 @@ static void kfree_rcu_work(struct work_struct *work)
>                 cond_resched_tasks_rcu_qs();
>         }
>  
> +       /* kvmalloc()/kvfree_rcu() channel. */
> +       for (; bvhead; bvhead = bvnext) {
> +               bvnext = bvhead->next;
> +
> +               rcu_lock_acquire(&rcu_callback_map);
> +               for (i = 0; i < bvhead->nr_records; i++)
> +                       kvfree(bvhead->records[i]);
> +               rcu_lock_release(&rcu_callback_map);
> +
> +               if (cmpxchg(&krcp->bvcached, NULL, bvhead))
> +                       free_page((unsigned long) bvhead);
> +
> +               cond_resched_tasks_rcu_qs();
> +       }
> +
>         /*
>          * Emergency case only. It can happen under low memory
>          * condition when an allocation gets failed, so the "bulk"
> @@ -2901,7 +2938,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
>                  * return false to tell caller to retry.
>                  */
>                 if ((krcp->bhead && !krwp->bhead_free) ||
> -                               (krcp->head && !krwp->head_free)) {
> +                               (krcp->head && !krwp->head_free) ||
> +                               (krcp->bvhead && !krwp->bvhead_free)) {
>                         /* Channel 1. */
>                         if (!krwp->bhead_free) {
>                                 krwp->bhead_free = krcp->bhead;
> @@ -2914,11 +2952,17 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
>                                 krcp->head = NULL;
>                         }
>  
> +                       /* Channel 3. */
> +                       if (!krwp->bvhead_free) {
> +                               krwp->bvhead_free = krcp->bvhead;
> +                               krcp->bvhead = NULL;
> +                       }
> +
>                         /*
> -                        * One work is per one batch, so there are two "free channels",
> -                        * "bhead_free" and "head_free" the batch can handle. It can be
> -                        * that the work is in the pending state when two channels have
> -                        * been detached following each other, one by one.
> +                        * One work is per one batch, so there are three "free channels",
> +                        * "bhead_free", "head_free" and "bvhead_free" the batch can handle.
> +                        * It can be that the work is in the pending state when two channels
> +                        * have been detached following each other, one by one.
>                          */
>                         queue_rcu_work(system_wq, &krwp->rcu_work);
>                         queued = true;
> @@ -3010,6 +3054,42 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
>         return true;
>  }
>  
> +static inline bool
> +kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> +{
> +       struct kvfree_rcu_bulk_data *bnode;
> +
> +       if (unlikely(!krcp->initialized))
> +               return false;
> +
> +       lockdep_assert_held(&krcp->lock);
> +
> +       if (!krcp->bvhead ||
> +                       krcp->bvhead->nr_records == KVFREE_BULK_MAX_ENTR) {
> +               bnode = xchg(&krcp->bvcached, NULL);
> +               if (!bnode) {
> +                       WARN_ON_ONCE(sizeof(struct kvfree_rcu_bulk_data) > PAGE_SIZE);
> +
> +                       bnode = (struct kvfree_rcu_bulk_data *)
> +                               __get_free_page(GFP_NOWAIT | __GFP_NOWARN);
> +               }
> +
> +               if (unlikely(!bnode))
> +                       return false;
> +
> +               /* Initialize the new block. */
> +               bnode->nr_records = 0;
> +               bnode->next = krcp->bvhead;
> +
> +               /* Attach it to the bvhead. */
> +               krcp->bvhead = bnode;
> +       }
> +
> +       /* Finally insert. */
> +       krcp->bvhead->records[krcp->bvhead->nr_records++] = ptr;
> +       return true;
> +}
> +
>  /*
>   * Queue a request for lazy invocation of kfree_bulk()/kfree() after a grace
>   * period. Please note there are two paths are maintained, one is the main one
> @@ -3022,32 +3102,39 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
>   * be free'd in workqueue context. This allows us to: batch requests together to
>   * reduce the number of grace periods during heavy kfree_rcu() load.
>   */
> -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr_to_free)
>  {
>         unsigned long flags;
>         struct kfree_rcu_cpu *krcp;
> +       bool skip_call_rcu = true;
>  
>         local_irq_save(flags);  // For safely calling this_cpu_ptr().
>         krcp = this_cpu_ptr(&krc);
>         if (krcp->initialized)
>                 spin_lock(&krcp->lock);
>  
> -       // Queue the object but don't yet schedule the batch.
> -       if (debug_rcu_head_queue(head)) {
> -               // Probable double kfree_rcu(), just leak.
> -               WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
> -                         __func__, head);
> -               goto unlock_return;
> -       }
> +       if (ptr_to_free) {
> +               skip_call_rcu = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr_to_free);
> +               if (!skip_call_rcu)
> +                       goto unlock_return;
> +       } else {
> +               // Queue the object but don't yet schedule the batch.
> +               if (debug_rcu_head_queue(head)) {
> +                       // Probable double kfree_rcu(), just leak.
> +                       WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
> +                               __func__, head);
> +                       goto unlock_return;
> +               }
>  
> -       /*
> -        * Under high memory pressure GFP_NOWAIT can fail,
> -        * in that case the emergency path is maintained.
> -        */
> -       if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
> -               head->func = func;
> -               head->next = krcp->head;
> -               krcp->head = head;
> +               /*
> +                * Under high memory pressure GFP_NOWAIT can fail,
> +                * in that case the emergency path is maintained.
> +                */
> +               if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
> +                       head->func = func;
> +                       head->next = krcp->head;
> +                       krcp->head = head;
> +               }
>         }
>  
>         // Set timer to drain after KFREE_DRAIN_JIFFIES.
> @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>         if (krcp->initialized)
>                 spin_unlock(&krcp->lock);
>         local_irq_restore(flags);
> +
> +       if (!skip_call_rcu) {
> +               synchronize_rcu();
> +               kvfree(ptr_to_free);
> +       }
>  }
>  EXPORT_SYMBOL_GPL(kfree_call_rcu);
> 
> diff --git a/mm/list_lru.c b/mm/list_lru.c
> index 0f1f6b06b7f3..0efb849b4f1f 100644
> --- a/mm/list_lru.c
> +++ b/mm/list_lru.c
> @@ -390,14 +390,6 @@ static void memcg_destroy_list_lru_node(struct list_lru_node *nlru)
>         kvfree(memcg_lrus);
>  }
> 
> -static void kvfree_rcu(struct rcu_head *head)
> -{
> -       struct list_lru_memcg *mlru;
> -
> -       mlru = container_of(head, struct list_lru_memcg, rcu);
> -       kvfree(mlru);
> -}
> -
>  static int memcg_update_list_lru_node(struct list_lru_node *nlru,
>                                       int old_size, int new_size)
>  {
> @@ -429,7 +421,7 @@ static int memcg_update_list_lru_node(struct list_lru_node *nlru,
>         rcu_assign_pointer(nlru->memcg_lrus, new);
>         spin_unlock_irq(&nlru->lock);
> 
> -       call_rcu(&old->rcu, kvfree_rcu);
> +       kvfree_rcu(old);
>         return 0;
>  }
> <snip>
> 
> now it becomes possible to use it like: 
> 	...
> 	void *p = kvmalloc(PAGE_SIZE);
> 	kvfree_rcu(p);
> 	...
> also have a look at the example in the mm/list_lru.c diff.
> 
> I can send out the RFC/PATCH that implements kvfree_rcu() API without need
> of "rcu_head" structure. 
> 
> Paul, Joel what are your thoughts?
> 
> Thank you in advance!
> 
> --
> Vlad Rezki
Uladzislau Rezki Feb. 21, 2020, 1:14 p.m. UTC | #11
On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > now it becomes possible to use it like: 
> > > 	...
> > > 	void *p = kvmalloc(PAGE_SIZE);
> > > 	kvfree_rcu(p);
> > > 	...
> > > also have a look at the example in the mm/list_lru.c diff.
> > 
> > I certainly like the interface, thanks!  I'm going to be pushing
> > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > number of bugs in ext4's online resizing which appear to be hitting
> > multiple cloud providers (with reports from both AWS and GCP) and I
> > want something which can be easily backported to stable kernels.
> > 
> > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > your kvfree_rcu() is definitely more efficient than my expedient
> > jury-rig.
> > 
> > I don't feel entirely competent to review the implementation, but I do
> > have one question.  It looks like the rcutiny implementation of
> > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > Am I missing something?
> 
> Good catch!  I believe that rcu_reclaim_tiny() would need to do
> kvfree() instead of its current kfree().
> 
> Vlad, anything I am missing here?
>
Yes something like that. There are some open questions about
realization, when it comes to tiny RCU. Since we are talking
about "headless" kvfree_rcu() interface, i mean we can not link
freed "objects" between each other, instead we should place a
pointer directly into array that will be drained later on.

It would be much more easier to achieve that if we were talking
about the interface like: kvfree_rcu(p, rcu), but that is not our
case :)

So, for CONFIG_TINY_RCU we should implement very similar what we
have done for CONFIG_TREE_RCU or just simply do like Ted has done
with his

void ext4_kvfree_array_rcu(void *to_free)

i mean:

   local_irq_save(flags);
   struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);

   if (ptr) {
           ptr->ptr = to_free;
           call_rcu(&ptr->rcu, kvfree_callback);
   }
   local_irq_restore(flags);

Also there is one more open question what to do if GFP_ATOMIC
gets failed in case of having low memory condition. Probably
we can make use of "mempool interface" that allows to have
min_nr guaranteed pre-allocated pages. 

Thanks!

--
Vlad Rezki
Joel Fernandes Feb. 21, 2020, 1:28 p.m. UTC | #12
On Fri, Feb 21, 2020 at 07:06:18AM -0500, Joel Fernandes wrote:
> On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote:
> > > On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote:
> > > > Hello, Joel, Paul, Ted. 
> > > > 
> > > > > 
> > > > > Good point!
> > > > > 
> > > > > Now that kfree_rcu() is on its way to being less of a hack deeply
> > > > > entangled into the bowels of RCU, this might be fairly easy to implement.
> > > > > It might well be simply a matter of a function pointer and a kvfree_rcu()
> > > > > API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> > > > > 
> > > > I think it makes sense. For example i see there is a similar demand in
> > > > the mm/list_lru.c too. As for implementation, it will not be hard, i think. 
> > > > 
> > > > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
> > > > logic(probably will need to rename that function), i.e. to free vmalloc() allocations
> > > > only in "emergency path" by just calling kvfree(). So that function in its turn will
> > > > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.
> > > 
> > > The other difference between ext4_kvfree_array_rcu() and kfree_rcu()
> > > is that kfree_rcu() is a magic macro which frees a structure, which
> > > has to contain a struct rcu_head.  In this case, I'm freeing a pointer
> > > to set of structures, or in another case, a set of buffer_heads, which
> > > do *not* have an rcu_head structure.
> > > 
> > We can implement kvfree_rcu() that will deal with pointer only, it is not
> > an issue. I mean without embedding rcu_head to the structure or whatever
> > else.
> > 
> > I tried to implement it with less number of changes to make it more clear
> > and readable. Please have a look:
> > 
> > <snip>
> > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> 
> Overall this implementation is nice. You are basically avoiding allocating
> rcu_head like Ted did by using the array-of-pointers technique we used for
> the previous kfree_rcu() work.
> 
> One thing stands out, the path where we could not allocate a page for the new
> block node:
> 
> > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> >         if (krcp->initialized)
> >                 spin_unlock(&krcp->lock);
> >         local_irq_restore(flags);
> > +
> > +       if (!skip_call_rcu) {
> > +               synchronize_rcu();
> > +               kvfree(ptr_to_free);
> 
> We can't block, it has to be async otherwise everything else blocks, and I
> think this can also be used from interrupt handlers which would at least be
> an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
> itself for the 'emergeny case' and use the regular techniques.
> 
> Another thing that stands out is the code duplication, if we can make this
> reuse as much as of the previous code as possible, that'd be great. I'd like
> to avoid bvcached and bvhead if possible. Maybe we can store information
> about the fact that this is a 'special object' in some of the lower-order
> bits of the pointer. Then we can detect that it is 'special' and free it
> using kvfree() during the reclaim

I was thinking something like the following, only build-tested -- just to
show the idea. Perhaps the synchronize_rcu() should be done from a workqueue
handler to prevent IRQ crapping out?

Basically what I did different is:
1. Use the existing kfree_rcu_bulk_data::records array to store the
   to-be-freed array.
2. In case of emergency, allocate a new wrapper and tag the pointer.
   Read the tag later to figure its an array wrapper and do additional kvfree.

debug_objects bits wouldn't work obviously for the !emergency kvfree case,
not sure what we can do there.
---8<-----------------------

diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 2678a37c31696..19fd7c74ad532 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
 #define __kfree_rcu(head, offset) \
 	do { \
 		BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \
-		kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
+		kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \
 	} while (0)
 
 /**
@@ -842,6 +842,14 @@ do {									\
 		__kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
 } while (0)
 
+#define kvfree_rcu(ptr)                                                \
+do {                                                                   \
+       typeof (ptr) ___p = (ptr);                                      \
+                                                                       \
+       if (___p)                                                       \
+               kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \
+} while (0)
+
 /*
  * Place this after a lock-acquisition primitive to guarantee that
  * an UNLOCK+LOCK pair acts as a full barrier.  This guarantee applies
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 045c28b71f4f3..a12ecc1645fa9 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
 	synchronize_rcu();
 }
 
-static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
 {
 	call_rcu(head, func);
 }
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 45f3f66bb04df..1e445b566c019 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
 }
 
 void synchronize_rcu_expedited(void);
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr);
 
 void rcu_barrier(void);
 bool rcu_eqs_special_set(int cpu);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index ec81139cc4c6a..7b6ab4160f080 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2814,6 +2814,15 @@ struct kfree_rcu_cpu {
 	bool initialized;
 };
 
+/*
+ * Used in a situation where array of pointers could
+ * not be put onto a kfree_bulk_data::records array.
+ */
+struct kfree_rcu_wrap_kvarray {
+	struct rcu_head head;
+	void *ptr;
+};
+
 static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
 
 static __always_inline void
@@ -2873,12 +2882,25 @@ static void kfree_rcu_work(struct work_struct *work)
 	 */
 	for (; head; head = next) {
 		unsigned long offset = (unsigned long)head->func;
+		bool is_array_ptr = false;
+
+		if (((unsigned long)head - offset) & BIT(0)) {
+			is_array_ptr = true;
+			offset = offset - 1;
+		}
 
 		next = head->next;
-		debug_rcu_head_unqueue(head);
+		if (!is_array_ptr)
+			debug_rcu_head_unqueue(head);
+
 		rcu_lock_acquire(&rcu_callback_map);
 		trace_rcu_invoke_kfree_callback(rcu_state.name, head, offset);
 
+		if (is_array_ptr) {
+			struct kfree_rcu_wrap_kvarray *kv = (void *)head - offset;
+			kvfree((void *)kv->ptr);
+		}
+
 		if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset)))
 			kfree((void *)head - offset);
 
@@ -2975,7 +2997,7 @@ static void kfree_rcu_monitor(struct work_struct *work)
 
 static inline bool
 kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
-	struct rcu_head *head, rcu_callback_t func)
+	struct rcu_head *head, rcu_callback_t func, void *ptr)
 {
 	struct kfree_rcu_bulk_data *bnode;
 
@@ -3009,14 +3031,20 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
 	}
 
 #ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
-	head->func = func;
-	head->next = krcp->bhead->head_free_debug;
-	krcp->bhead->head_free_debug = head;
+	/* debug_objects doesn't work for kvfree */
+	if (!ptr) {
+		head->func = func;
+		head->next = krcp->bhead->head_free_debug;
+		krcp->bhead->head_free_debug = head;
+	}
 #endif
 
 	/* Finally insert. */
-	krcp->bhead->records[krcp->bhead->nr_records++] =
-		(void *) head - (unsigned long) func;
+	if (ptr)
+		krcp->bhead->records[krcp->bhead->nr_records++] = ptr;
+	else
+		krcp->bhead->records[krcp->bhead->nr_records++] =
+			(void *) head - (unsigned long) func;
 
 	return true;
 }
@@ -3033,10 +3061,11 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
  * be free'd in workqueue context. This allows us to: batch requests together to
  * reduce the number of grace periods during heavy kfree_rcu() load.
  */
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
 {
 	unsigned long flags;
 	struct kfree_rcu_cpu *krcp;
+	bool ret;
 
 	local_irq_save(flags);	// For safely calling this_cpu_ptr().
 	krcp = this_cpu_ptr(&krc);
@@ -3044,7 +3073,8 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 		spin_lock(&krcp->lock);
 
 	// Queue the object but don't yet schedule the batch.
-	if (debug_rcu_head_queue(head)) {
+	// NOTE: debug objects doesn't work for kvfree.
+	if (!ptr && debug_rcu_head_queue(head)) {
 		// Probable double kfree_rcu(), just leak.
 		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
 			  __func__, head);
@@ -3055,7 +3085,29 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 	 * Under high memory pressure GFP_NOWAIT can fail,
 	 * in that case the emergency path is maintained.
 	 */
-	if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
+	ret = !kfree_call_rcu_add_ptr_to_bulk(krcp, head, func, ptr);
+	if (unlikely(!ret)) {
+		if (ptr) {
+			struct kfree_rcu_wrap_kvarray *kvwrap;
+
+			kvwrap = kzalloc(sizeof(*kvwrap), GFP_KERNEL);
+
+			// If memory is really low, just try inline-freeing.
+			if (!kvwrap) {
+				// NOT SURE if permitted due to IRQ. Maybe we
+				// should try doing this from WQ?
+				synchronize_rcu();
+				kvfree(ptr);
+			}
+
+			kvwrap->ptr = ptr;
+			ptr = NULL;
+			head = &(kvwrap->head);
+			func = offsetof(typeof(*kvwrap), head);
+			// Tag the array as wrapper
+			func = (rcu_callback_t)((unsigned long)func + 1);
+		}
+
 		head->func = func;
 		head->next = krcp->head;
 		krcp->head = head;
Uladzislau Rezki Feb. 21, 2020, 7:21 p.m. UTC | #13
> > 
> > Overall this implementation is nice. You are basically avoiding allocating
> > rcu_head like Ted did by using the array-of-pointers technique we used for
> > the previous kfree_rcu() work.
> > 
> > One thing stands out, the path where we could not allocate a page for the new
> > block node:
> > 
> > > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > >         if (krcp->initialized)
> > >                 spin_unlock(&krcp->lock);
> > >         local_irq_restore(flags);
> > > +
> > > +       if (!skip_call_rcu) {
> > > +               synchronize_rcu();
> > > +               kvfree(ptr_to_free);
> > 
> > We can't block, it has to be async otherwise everything else blocks, and I
> > think this can also be used from interrupt handlers which would at least be
> > an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
> > itself for the 'emergeny case' and use the regular techniques.
> > 
> > Another thing that stands out is the code duplication, if we can make this
> > reuse as much as of the previous code as possible, that'd be great. I'd like
> > to avoid bvcached and bvhead if possible. Maybe we can store information
> > about the fact that this is a 'special object' in some of the lower-order
> > bits of the pointer. Then we can detect that it is 'special' and free it
> > using kvfree() during the reclaim
> 
> Basically what I did different is:
> 1. Use the existing kfree_rcu_bulk_data::records array to store the
>    to-be-freed array.
> 2. In case of emergency, allocate a new wrapper and tag the pointer.
>    Read the tag later to figure its an array wrapper and do additional kvfree.
>
I see your point and agree that duplication is odd and we should avoid
it as much as possible. Also, i like the idea of using the wrapper as
one more chance to build a "head" for headless object.

I did not mix pointers because then you will need to understand what is what.
It is OK for "emergency" path, because we simply can just serialize it by kvfree()
call, it checks inside what the ptr address belong to:

<snip>
void kvfree(const void *addr)
{
    if (is_vmalloc_addr(addr))
        vfree(addr);
    else
        kfree(addr);
}
<snip>

whereas normal path, i mean "bulk one" where we store pointers into array
would be broken. We can not call kfree_bulk(array, nr_entries) if the passed
array contains "vmalloc" pointers, because it is different allocator. Therefore,
i deliberately have made it as a special case.

> 
> Perhaps the synchronize_rcu() should be done from a workqueue handler
> to prevent IRQ crapping out?
>
I think so. For example one approach would be:

<snip>
struct free_deferred {
 struct llist_head list;
 struct work_struct wq;
};
static DEFINE_PER_CPU(struct free_deferred, free_deferred);

static void free_work(struct work_struct *w)
{
  struct free_deferred *p = container_of(w, struct free_deferred, wq);
  struct llist_node *t, *llnode;

  synchronize_rcu();

  llist_for_each_safe(llnode, t, llist_del_all(&p->list))
     vfree((void *)llnode, 1);
}

static inline void free_deferred_common(void *ptr_to_free)
{
    struct free_deferred *p = raw_cpu_ptr(&free_deferred);

    if (llist_add((struct llist_node *)ptr_to_free, &p->list))
        schedule_work(&p->wq);
}
<snip>

and it seems it should work. Because we know that KMALLOC_MIN_SIZE
can not be less then machine word:

/*
 * Kmalloc subsystem.
 */
 #ifndef KMALLOC_MIN_SIZE
 #define KMALLOC_MIN_SIZE (1 << KMALLOC_SHIFT_LOW)
 #endif

when it comes to vmalloc pointer it can not be less then one PAGE_SIZE :)

Another thing:

we are talking about "headless" variant that is special, therefore it
implies to have some restrictions, since we need a dynamic memory to
drive it. For example "headless" object can be freed from preemptible
context only, because freeing can be inlined:

<snip>
+   // NOT SURE if permitted due to IRQ. Maybe we
+   // should try doing this from WQ?
+   synchronize_rcu();
+   kvfree(ptr);
<snip>

Calling synchronize_rcu() from the IRQ context will screw the system up :)
Because the current CPU will never pass the QS state if i do not miss something.
Also kvfree() itself can be called from the preemptible context only, excluding IRQ,
there is a special path for it, otherwise vfree() can sleep. 

> 
> debug_objects bits wouldn't work obviously for the !emergency kvfree case,
> not sure what we can do there.
>
Agree.

Thank you, Joel, for your comments!

--
Vlad Rezki
Uladzislau Rezki Feb. 21, 2020, 7:25 p.m. UTC | #14
> 
>   llist_for_each_safe(llnode, t, llist_del_all(&p->list))
>      vfree((void *)llnode, 1);
>
Should be kvfree((void *)llnode, 1);

--
Vlad Rezki
Paul E. McKenney Feb. 21, 2020, 8:22 p.m. UTC | #15
On Fri, Feb 21, 2020 at 02:14:55PM +0100, Uladzislau Rezki wrote:
> On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> > On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > > now it becomes possible to use it like: 
> > > > 	...
> > > > 	void *p = kvmalloc(PAGE_SIZE);
> > > > 	kvfree_rcu(p);
> > > > 	...
> > > > also have a look at the example in the mm/list_lru.c diff.
> > > 
> > > I certainly like the interface, thanks!  I'm going to be pushing
> > > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > > number of bugs in ext4's online resizing which appear to be hitting
> > > multiple cloud providers (with reports from both AWS and GCP) and I
> > > want something which can be easily backported to stable kernels.
> > > 
> > > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > > your kvfree_rcu() is definitely more efficient than my expedient
> > > jury-rig.
> > > 
> > > I don't feel entirely competent to review the implementation, but I do
> > > have one question.  It looks like the rcutiny implementation of
> > > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > > Am I missing something?
> > 
> > Good catch!  I believe that rcu_reclaim_tiny() would need to do
> > kvfree() instead of its current kfree().
> > 
> > Vlad, anything I am missing here?
> >
> Yes something like that. There are some open questions about
> realization, when it comes to tiny RCU. Since we are talking
> about "headless" kvfree_rcu() interface, i mean we can not link
> freed "objects" between each other, instead we should place a
> pointer directly into array that will be drained later on.
> 
> It would be much more easier to achieve that if we were talking
> about the interface like: kvfree_rcu(p, rcu), but that is not our
> case :)
> 
> So, for CONFIG_TINY_RCU we should implement very similar what we
> have done for CONFIG_TREE_RCU or just simply do like Ted has done
> with his
> 
> void ext4_kvfree_array_rcu(void *to_free)
> 
> i mean:
> 
>    local_irq_save(flags);
>    struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);
> 
>    if (ptr) {
>            ptr->ptr = to_free;
>            call_rcu(&ptr->rcu, kvfree_callback);
>    }
>    local_irq_restore(flags);

We really do still need the emergency case, in this case for when
kzalloc() returns NULL.  Which does indeed mean an rcu_head in the thing
being freed.  Otherwise, you end up with an out-of-memory deadlock where
you could free memory only if you had memor to allocate.

> Also there is one more open question what to do if GFP_ATOMIC
> gets failed in case of having low memory condition. Probably
> we can make use of "mempool interface" that allows to have
> min_nr guaranteed pre-allocated pages. 

But we really do still need to handle the case where everything runs out,
even the pre-allocated pages.

							Thanx, Paul
Joel Fernandes Feb. 22, 2020, 10:12 p.m. UTC | #16
On Fri, Feb 21, 2020 at 08:21:52PM +0100, Uladzislau Rezki wrote:
> > > 
> > > Overall this implementation is nice. You are basically avoiding allocating
> > > rcu_head like Ted did by using the array-of-pointers technique we used for
> > > the previous kfree_rcu() work.
> > > 
> > > One thing stands out, the path where we could not allocate a page for the new
> > > block node:
> > > 
> > > > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > >         if (krcp->initialized)
> > > >                 spin_unlock(&krcp->lock);
> > > >         local_irq_restore(flags);
> > > > +
> > > > +       if (!skip_call_rcu) {
> > > > +               synchronize_rcu();
> > > > +               kvfree(ptr_to_free);
> > > 
> > > We can't block, it has to be async otherwise everything else blocks, and I
> > > think this can also be used from interrupt handlers which would at least be
> > > an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
> > > itself for the 'emergeny case' and use the regular techniques.
> > > 
> > > Another thing that stands out is the code duplication, if we can make this
> > > reuse as much as of the previous code as possible, that'd be great. I'd like
> > > to avoid bvcached and bvhead if possible. Maybe we can store information
> > > about the fact that this is a 'special object' in some of the lower-order
> > > bits of the pointer. Then we can detect that it is 'special' and free it
> > > using kvfree() during the reclaim
> > 
> > Basically what I did different is:
> > 1. Use the existing kfree_rcu_bulk_data::records array to store the
> >    to-be-freed array.
> > 2. In case of emergency, allocate a new wrapper and tag the pointer.
> >    Read the tag later to figure its an array wrapper and do additional kvfree.
> >
> I see your point and agree that duplication is odd and we should avoid
> it as much as possible. Also, i like the idea of using the wrapper as
> one more chance to build a "head" for headless object.
> 
> I did not mix pointers because then you will need to understand what is what.

Well that's why I brought up the whole tagging idea. Then you don't need
separate pointers to manage either (edit: but maybe you do as you mentioned
vfree below..).

> It is OK for "emergency" path, because we simply can just serialize it by kvfree()
> call, it checks inside what the ptr address belong to:
> 
> <snip>
> void kvfree(const void *addr)
> {
>     if (is_vmalloc_addr(addr))
>         vfree(addr);
>     else
>         kfree(addr);
> }
> <snip>
> 
> whereas normal path, i mean "bulk one" where we store pointers into array
> would be broken. We can not call kfree_bulk(array, nr_entries) if the passed
> array contains "vmalloc" pointers, because it is different allocator. Therefore,
> i deliberately have made it as a special case.

Ok, it would be nice if you can verify that ptr_to_free passed to
kfree_call_rcu() is infact a vmalloc pointer.

> > Perhaps the synchronize_rcu() should be done from a workqueue handler
> > to prevent IRQ crapping out?
> >
> I think so. For example one approach would be:
> 
> <snip>
> struct free_deferred {
>  struct llist_head list;
>  struct work_struct wq;
> };
> static DEFINE_PER_CPU(struct free_deferred, free_deferred);
> 
> static void free_work(struct work_struct *w)
> {
>   struct free_deferred *p = container_of(w, struct free_deferred, wq);
>   struct llist_node *t, *llnode;
> 
>   synchronize_rcu();
> 
>   llist_for_each_safe(llnode, t, llist_del_all(&p->list))
>      vfree((void *)llnode, 1);
> }
> 
> static inline void free_deferred_common(void *ptr_to_free)
> {
>     struct free_deferred *p = raw_cpu_ptr(&free_deferred);
> 
>     if (llist_add((struct llist_node *)ptr_to_free, &p->list))

Would this not corrupt the ptr_to_free pointer which readers might still be
accessing since grace period has not yet ended?

We cannot touch the ptr_to_free pointer until after the grace period has
ended.

>         schedule_work(&p->wq);
> }
> <snip>
> 
> and it seems it should work. Because we know that KMALLOC_MIN_SIZE
> can not be less then machine word:
> 
> /*
>  * Kmalloc subsystem.
>  */
>  #ifndef KMALLOC_MIN_SIZE
>  #define KMALLOC_MIN_SIZE (1 << KMALLOC_SHIFT_LOW)
>  #endif
> 
> when it comes to vmalloc pointer it can not be less then one PAGE_SIZE :)
> 
> Another thing:
> 
> we are talking about "headless" variant that is special, therefore it
> implies to have some restrictions, since we need a dynamic memory to
> drive it. For example "headless" object can be freed from preemptible
> context only, because freeing can be inlined:
> 
> <snip>
> +   // NOT SURE if permitted due to IRQ. Maybe we
> +   // should try doing this from WQ?
> +   synchronize_rcu();
> +   kvfree(ptr);
> <snip>
> 
> Calling synchronize_rcu() from the IRQ context will screw the system up :)
> Because the current CPU will never pass the QS state if i do not miss something.

Yes are you right, calling synchronize_rcu() from IRQ context is a strict no-no.

I believe we could tap into the GFP_ATOMIC emergency memory pool for this
emergency situation. This pool is used for emergency cases. I think in
emergency we can grow an rcu_head on this pool.

> Also kvfree() itself can be called from the preemptible context only, excluding IRQ,
> there is a special path for it, otherwise vfree() can sleep. 

Ok that's good to know.

> > debug_objects bits wouldn't work obviously for the !emergency kvfree case,
> > not sure what we can do there.
> >
> Agree.
> 
> Thank you, Joel, for your comments!

No problem, I think we have a couple of ideas here.

What I also wanted to do was (may be after all this), see if we can create an
API for head-less kfree based on the same ideas. Not just for arrays for for
any object. Calling it, say, kfree_rcu_headless() and then use the bulk array
as we have been doing. That would save any users from having an rcu_head --
of course with all needed warnings about memory allocation failure. Vlad,
What do you think? Paul, any thoughts on this?

thanks,

 - Joel
Joel Fernandes Feb. 22, 2020, 10:24 p.m. UTC | #17
On Fri, Feb 21, 2020 at 12:22:50PM -0800, Paul E. McKenney wrote:
> On Fri, Feb 21, 2020 at 02:14:55PM +0100, Uladzislau Rezki wrote:
> > On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> > > On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > > > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > > > now it becomes possible to use it like: 
> > > > > 	...
> > > > > 	void *p = kvmalloc(PAGE_SIZE);
> > > > > 	kvfree_rcu(p);
> > > > > 	...
> > > > > also have a look at the example in the mm/list_lru.c diff.
> > > > 
> > > > I certainly like the interface, thanks!  I'm going to be pushing
> > > > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > > > number of bugs in ext4's online resizing which appear to be hitting
> > > > multiple cloud providers (with reports from both AWS and GCP) and I
> > > > want something which can be easily backported to stable kernels.
> > > > 
> > > > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > > > your kvfree_rcu() is definitely more efficient than my expedient
> > > > jury-rig.
> > > > 
> > > > I don't feel entirely competent to review the implementation, but I do
> > > > have one question.  It looks like the rcutiny implementation of
> > > > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > > > Am I missing something?
> > > 
> > > Good catch!  I believe that rcu_reclaim_tiny() would need to do
> > > kvfree() instead of its current kfree().
> > > 
> > > Vlad, anything I am missing here?
> > >
> > Yes something like that. There are some open questions about
> > realization, when it comes to tiny RCU. Since we are talking
> > about "headless" kvfree_rcu() interface, i mean we can not link
> > freed "objects" between each other, instead we should place a
> > pointer directly into array that will be drained later on.
> > 
> > It would be much more easier to achieve that if we were talking
> > about the interface like: kvfree_rcu(p, rcu), but that is not our
> > case :)
> > 
> > So, for CONFIG_TINY_RCU we should implement very similar what we
> > have done for CONFIG_TREE_RCU or just simply do like Ted has done
> > with his
> > 
> > void ext4_kvfree_array_rcu(void *to_free)
> > 
> > i mean:
> > 
> >    local_irq_save(flags);
> >    struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);
> > 
> >    if (ptr) {
> >            ptr->ptr = to_free;
> >            call_rcu(&ptr->rcu, kvfree_callback);
> >    }
> >    local_irq_restore(flags);
> 
> We really do still need the emergency case, in this case for when
> kzalloc() returns NULL.  Which does indeed mean an rcu_head in the thing
> being freed.  Otherwise, you end up with an out-of-memory deadlock where
> you could free memory only if you had memor to allocate.

Can we rely on GFP_ATOMIC allocations for these? These have emergency memory
pools which are reserved.

I was thinking a 2 fold approach (just thinking out loud..):

If kfree_call_rcu() is called in atomic context or in any rcu reader, then
use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
queue that.

Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
synchronize_rcu() inline with it.

Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
between the 2 cases.

Thoughts?

> > Also there is one more open question what to do if GFP_ATOMIC
> > gets failed in case of having low memory condition. Probably
> > we can make use of "mempool interface" that allows to have
> > min_nr guaranteed pre-allocated pages. 
> 
> But we really do still need to handle the case where everything runs out,
> even the pre-allocated pages.

If *everything* runs out, you are pretty much going to OOM sooner or later
anyway :D. But I see what you mean. But the 'tradeoff' is RCU can free
head-less objects where possible.

thanks,

 - Joel
Paul E. McKenney Feb. 23, 2020, 1:10 a.m. UTC | #18
On Sat, Feb 22, 2020 at 05:24:15PM -0500, Joel Fernandes wrote:
> On Fri, Feb 21, 2020 at 12:22:50PM -0800, Paul E. McKenney wrote:
> > On Fri, Feb 21, 2020 at 02:14:55PM +0100, Uladzislau Rezki wrote:
> > > On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> > > > On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > > > > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > > > > now it becomes possible to use it like: 
> > > > > > 	...
> > > > > > 	void *p = kvmalloc(PAGE_SIZE);
> > > > > > 	kvfree_rcu(p);
> > > > > > 	...
> > > > > > also have a look at the example in the mm/list_lru.c diff.
> > > > > 
> > > > > I certainly like the interface, thanks!  I'm going to be pushing
> > > > > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > > > > number of bugs in ext4's online resizing which appear to be hitting
> > > > > multiple cloud providers (with reports from both AWS and GCP) and I
> > > > > want something which can be easily backported to stable kernels.
> > > > > 
> > > > > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > > > > your kvfree_rcu() is definitely more efficient than my expedient
> > > > > jury-rig.
> > > > > 
> > > > > I don't feel entirely competent to review the implementation, but I do
> > > > > have one question.  It looks like the rcutiny implementation of
> > > > > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > > > > Am I missing something?
> > > > 
> > > > Good catch!  I believe that rcu_reclaim_tiny() would need to do
> > > > kvfree() instead of its current kfree().
> > > > 
> > > > Vlad, anything I am missing here?
> > > >
> > > Yes something like that. There are some open questions about
> > > realization, when it comes to tiny RCU. Since we are talking
> > > about "headless" kvfree_rcu() interface, i mean we can not link
> > > freed "objects" between each other, instead we should place a
> > > pointer directly into array that will be drained later on.
> > > 
> > > It would be much more easier to achieve that if we were talking
> > > about the interface like: kvfree_rcu(p, rcu), but that is not our
> > > case :)
> > > 
> > > So, for CONFIG_TINY_RCU we should implement very similar what we
> > > have done for CONFIG_TREE_RCU or just simply do like Ted has done
> > > with his
> > > 
> > > void ext4_kvfree_array_rcu(void *to_free)
> > > 
> > > i mean:
> > > 
> > >    local_irq_save(flags);
> > >    struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);
> > > 
> > >    if (ptr) {
> > >            ptr->ptr = to_free;
> > >            call_rcu(&ptr->rcu, kvfree_callback);
> > >    }
> > >    local_irq_restore(flags);
> > 
> > We really do still need the emergency case, in this case for when
> > kzalloc() returns NULL.  Which does indeed mean an rcu_head in the thing
> > being freed.  Otherwise, you end up with an out-of-memory deadlock where
> > you could free memory only if you had memor to allocate.
> 
> Can we rely on GFP_ATOMIC allocations for these? These have emergency memory
> pools which are reserved.

You can, at least until the emergency memory pools are exhausted.

> I was thinking a 2 fold approach (just thinking out loud..):
> 
> If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> queue that.
> 
> Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> synchronize_rcu() inline with it.
> 
> Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> between the 2 cases.
> 
> Thoughts?

How much are we really losing by having an rcu_head in the structure,
either separately or unioned over other fields?

> > > Also there is one more open question what to do if GFP_ATOMIC
> > > gets failed in case of having low memory condition. Probably
> > > we can make use of "mempool interface" that allows to have
> > > min_nr guaranteed pre-allocated pages. 
> > 
> > But we really do still need to handle the case where everything runs out,
> > even the pre-allocated pages.
> 
> If *everything* runs out, you are pretty much going to OOM sooner or later
> anyway :D. But I see what you mean. But the 'tradeoff' is RCU can free
> head-less objects where possible.

Would you rather pay an rcu_head tax (in cases where it cannot share
with other fields), or would you rather have states where you could free
a lot of memory if only there was some memory to allocate to track the
memory to be freed?

But yes, as you suggested above, there could be an API similar to the
userspace RCU library's API that usually just queues the callback but
sometimes sleeps for a full grace period.  If enough people want this,
it should not be hard to set up.

							Thanx, Paul
Uladzislau Rezki Feb. 24, 2020, 5:02 p.m. UTC | #19
> > > > 
> > > > Overall this implementation is nice. You are basically avoiding allocating
> > > > rcu_head like Ted did by using the array-of-pointers technique we used for
> > > > the previous kfree_rcu() work.
> > > > 
> > > > One thing stands out, the path where we could not allocate a page for the new
> > > > block node:
> > > > 
> > > > > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > > >         if (krcp->initialized)
> > > > >                 spin_unlock(&krcp->lock);
> > > > >         local_irq_restore(flags);
> > > > > +
> > > > > +       if (!skip_call_rcu) {
> > > > > +               synchronize_rcu();
> > > > > +               kvfree(ptr_to_free);
> > > > 
> > > > We can't block, it has to be async otherwise everything else blocks, and I
> > > > think this can also be used from interrupt handlers which would at least be
> > > > an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
> > > > itself for the 'emergeny case' and use the regular techniques.
> > > > 
> > > > Another thing that stands out is the code duplication, if we can make this
> > > > reuse as much as of the previous code as possible, that'd be great. I'd like
> > > > to avoid bvcached and bvhead if possible. Maybe we can store information
> > > > about the fact that this is a 'special object' in some of the lower-order
> > > > bits of the pointer. Then we can detect that it is 'special' and free it
> > > > using kvfree() during the reclaim
> > > 
> > > Basically what I did different is:
> > > 1. Use the existing kfree_rcu_bulk_data::records array to store the
> > >    to-be-freed array.
> > > 2. In case of emergency, allocate a new wrapper and tag the pointer.
> > >    Read the tag later to figure its an array wrapper and do additional kvfree.
> > >
> > I see your point and agree that duplication is odd and we should avoid
> > it as much as possible. Also, i like the idea of using the wrapper as
> > one more chance to build a "head" for headless object.
> > 
> > I did not mix pointers because then you will need to understand what is what.
> 
> Well that's why I brought up the whole tagging idea. Then you don't need
> separate pointers to manage either (edit: but maybe you do as you mentioned
> vfree below..).
> 
Right. We can use tagging idea to separate kmalloc/vmalloc pointers to
place them into different arrays. Because kvmalloc() can return either
SLAB pointer or vmalloc one.

> > It is OK for "emergency" path, because we simply can just serialize it by kvfree()
> > call, it checks inside what the ptr address belong to:
> > 
> > <snip>
> > void kvfree(const void *addr)
> > {
> >     if (is_vmalloc_addr(addr))
> >         vfree(addr);
> >     else
> >         kfree(addr);
> > }
> > <snip>
> > 
> > whereas normal path, i mean "bulk one" where we store pointers into array
> > would be broken. We can not call kfree_bulk(array, nr_entries) if the passed
> > array contains "vmalloc" pointers, because it is different allocator. Therefore,
> > i deliberately have made it as a special case.
> 
> Ok, it would be nice if you can verify that ptr_to_free passed to
> kfree_call_rcu() is infact a vmalloc pointer.
> 
We can do that. We can check it by calling is_vmalloc_addr() on ptr. 
So it is possible to differentiate.

> > > Perhaps the synchronize_rcu() should be done from a workqueue handler
> > > to prevent IRQ crapping out?
> > >
> > I think so. For example one approach would be:
> > 
> > <snip>
> > struct free_deferred {
> >  struct llist_head list;
> >  struct work_struct wq;
> > };
> > static DEFINE_PER_CPU(struct free_deferred, free_deferred);
> > 
> > static void free_work(struct work_struct *w)
> > {
> >   struct free_deferred *p = container_of(w, struct free_deferred, wq);
> >   struct llist_node *t, *llnode;
> > 
> >   synchronize_rcu();
> > 
> >   llist_for_each_safe(llnode, t, llist_del_all(&p->list))
> >      vfree((void *)llnode, 1);
> > }
> > 
> > static inline void free_deferred_common(void *ptr_to_free)
> > {
> >     struct free_deferred *p = raw_cpu_ptr(&free_deferred);
> > 
> >     if (llist_add((struct llist_node *)ptr_to_free, &p->list))
> 
> Would this not corrupt the ptr_to_free pointer which readers might still be
> accessing since grace period has not yet ended?
> 
> We cannot touch the ptr_to_free pointer until after the grace period has
> ended.
> 
Right you are. We can do that only after grace period is passed, 
after synchronize_rcu(). Good point :)

> > }
> > <snip>
> > 
> > and it seems it should work. Because we know that KMALLOC_MIN_SIZE
> > can not be less then machine word:
> > 
> > /*
> >  * Kmalloc subsystem.
> >  */
> >  #ifndef KMALLOC_MIN_SIZE
> >  #define KMALLOC_MIN_SIZE (1 << KMALLOC_SHIFT_LOW)
> >  #endif
> > 
> > when it comes to vmalloc pointer it can not be less then one PAGE_SIZE :)
> > 
> > Another thing:
> > 
> > we are talking about "headless" variant that is special, therefore it
> > implies to have some restrictions, since we need a dynamic memory to
> > drive it. For example "headless" object can be freed from preemptible
> > context only, because freeing can be inlined:
> > 
> > <snip>
> > +   // NOT SURE if permitted due to IRQ. Maybe we
> > +   // should try doing this from WQ?
> > +   synchronize_rcu();
> > +   kvfree(ptr);
> > <snip>
> > 
> > Calling synchronize_rcu() from the IRQ context will screw the system up :)
> > Because the current CPU will never pass the QS state if i do not miss something.
> 
> Yes are you right, calling synchronize_rcu() from IRQ context is a strict no-no.
> 
> I believe we could tap into the GFP_ATOMIC emergency memory pool for this
> emergency situation. This pool is used for emergency cases. I think in
> emergency we can grow an rcu_head on this pool.
> 
> > Also kvfree() itself can be called from the preemptible context only, excluding IRQ,
> > there is a special path for it, otherwise vfree() can sleep. 
> 
> Ok that's good to know.
> 
> > > debug_objects bits wouldn't work obviously for the !emergency kvfree case,
> > > not sure what we can do there.
> > >
> > Agree.
> > 
> > Thank you, Joel, for your comments!
> 
> No problem, I think we have a couple of ideas here.
> 
> What I also wanted to do was (may be after all this), see if we can create an
> API for head-less kfree based on the same ideas. Not just for arrays for for
> any object. Calling it, say, kfree_rcu_headless() and then use the bulk array
> as we have been doing. That would save any users from having an rcu_head --
> of course with all needed warnings about memory allocation failure. Vlad,
> What do you think? Paul, any thoughts on this?
> 
I like it. It would be more clean interface. Also there are places where
people do not embed the rcu_head into their stuctures for some reason
and do like:


<snip>
    synchronize_rcu();
    kfree(p);
<snip>

<snip>
urezki@pc636:~/data/ssd/coding/linux-rcu$ find ./ -name "*.c" | xargs grep -C 1 -rn "synchronize_rcu" | grep kfree
./arch/x86/mm/mmio-mod.c-314-           kfree(found_trace);
./kernel/module.c-3910- kfree(mod->args);
./kernel/trace/ftrace.c-5078-                   kfree(direct);
./kernel/trace/ftrace.c-5155-                   kfree(direct);
./kernel/trace/trace_probe.c-1087-      kfree(link);
./fs/nfs/sysfs.c-113-           kfree(old);
./fs/ext4/super.c-1701- kfree(old_qname);
./net/ipv4/gre.mod.c-36-        { 0xfc3fcca2, "kfree_skb" },
./net/core/sysctl_net_core.c-143-                               kfree(cur);
./drivers/crypto/nx/nx-842-pseries.c-1010-      kfree(old_devdata);
./drivers/misc/vmw_vmci/vmci_context.c-692-             kfree(notifier);
./drivers/misc/vmw_vmci/vmci_event.c-213-       kfree(s);
./drivers/infiniband/core/device.c:2162:                         * synchronize_rcu before the netdev is kfreed, so we
./drivers/infiniband/hw/hfi1/sdma.c-1337-       kfree(dd->per_sdma);
./drivers/net/ethernet/myricom/myri10ge/myri10ge.c-3582-        kfree(mgp->ss);
./drivers/net/ethernet/myricom/myri10ge/myri10ge.mod.c-156-     { 0x37a0cba, "kfree" },
./drivers/net/ethernet/mellanox/mlx5/core/fpga/tls.c:286:       synchronize_rcu(); /* before kfree(flow) */
./drivers/net/ethernet/mellanox/mlxsw/core.c-1504-      kfree(rxl_item);
./drivers/net/ethernet/chelsio/cxgb4/cxgb4_main.c-6648- kfree(adapter->mbox_log);
./drivers/net/ethernet/chelsio/cxgb4/cxgb4_main.c-6650- kfree(adapter);
./drivers/block/drbd/drbd_receiver.c-3804-      kfree(old_net_conf);
./drivers/block/drbd/drbd_receiver.c-4176-                      kfree(old_disk_conf);
./drivers/block/drbd/drbd_state.c-2074-         kfree(old_conf);
./drivers/block/drbd/drbd_nl.c-1689-    kfree(old_disk_conf);
./drivers/block/drbd/drbd_nl.c-2522-    kfree(old_net_conf);
./drivers/block/drbd/drbd_nl.c-2935-            kfree(old_disk_conf);
./drivers/mfd/dln2.c-178-               kfree(i);
./drivers/staging/fwserial/fwserial.c-2122-     kfree(peer);
<snip>

--
Vlad Rezki
Uladzislau Rezki Feb. 24, 2020, 5:40 p.m. UTC | #20
On Sat, Feb 22, 2020 at 05:10:18PM -0800, Paul E. McKenney wrote:
> On Sat, Feb 22, 2020 at 05:24:15PM -0500, Joel Fernandes wrote:
> > On Fri, Feb 21, 2020 at 12:22:50PM -0800, Paul E. McKenney wrote:
> > > On Fri, Feb 21, 2020 at 02:14:55PM +0100, Uladzislau Rezki wrote:
> > > > On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> > > > > On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > > > > > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > > > > > now it becomes possible to use it like: 
> > > > > > > 	...
> > > > > > > 	void *p = kvmalloc(PAGE_SIZE);
> > > > > > > 	kvfree_rcu(p);
> > > > > > > 	...
> > > > > > > also have a look at the example in the mm/list_lru.c diff.
> > > > > > 
> > > > > > I certainly like the interface, thanks!  I'm going to be pushing
> > > > > > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > > > > > number of bugs in ext4's online resizing which appear to be hitting
> > > > > > multiple cloud providers (with reports from both AWS and GCP) and I
> > > > > > want something which can be easily backported to stable kernels.
> > > > > > 
> > > > > > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > > > > > your kvfree_rcu() is definitely more efficient than my expedient
> > > > > > jury-rig.
> > > > > > 
> > > > > > I don't feel entirely competent to review the implementation, but I do
> > > > > > have one question.  It looks like the rcutiny implementation of
> > > > > > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > > > > > Am I missing something?
> > > > > 
> > > > > Good catch!  I believe that rcu_reclaim_tiny() would need to do
> > > > > kvfree() instead of its current kfree().
> > > > > 
> > > > > Vlad, anything I am missing here?
> > > > >
> > > > Yes something like that. There are some open questions about
> > > > realization, when it comes to tiny RCU. Since we are talking
> > > > about "headless" kvfree_rcu() interface, i mean we can not link
> > > > freed "objects" between each other, instead we should place a
> > > > pointer directly into array that will be drained later on.
> > > > 
> > > > It would be much more easier to achieve that if we were talking
> > > > about the interface like: kvfree_rcu(p, rcu), but that is not our
> > > > case :)
> > > > 
> > > > So, for CONFIG_TINY_RCU we should implement very similar what we
> > > > have done for CONFIG_TREE_RCU or just simply do like Ted has done
> > > > with his
> > > > 
> > > > void ext4_kvfree_array_rcu(void *to_free)
> > > > 
> > > > i mean:
> > > > 
> > > >    local_irq_save(flags);
> > > >    struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);
> > > > 
> > > >    if (ptr) {
> > > >            ptr->ptr = to_free;
> > > >            call_rcu(&ptr->rcu, kvfree_callback);
> > > >    }
> > > >    local_irq_restore(flags);
> > > 
> > > We really do still need the emergency case, in this case for when
> > > kzalloc() returns NULL.  Which does indeed mean an rcu_head in the thing
> > > being freed.  Otherwise, you end up with an out-of-memory deadlock where
> > > you could free memory only if you had memor to allocate.
> > 
> > Can we rely on GFP_ATOMIC allocations for these? These have emergency memory
> > pools which are reserved.
> 
> You can, at least until the emergency memory pools are exhausted.
> 
> > I was thinking a 2 fold approach (just thinking out loud..):
> > 
> > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > queue that.
> > 
I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
gets failed in atomic context? Or we can just consider it as out of
memory and another variant is to say that headless object can be called
from preemptible context only.

> >
> > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > synchronize_rcu() inline with it.
> > 
> >
What do you mean here, Joel? "grow an rcu_head on the stack"?

> >
> > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > between the 2 cases.
> > 
If the current context is preemptable then we can inline synchronize_rcu()
together with freeing to handle such corner case, i mean when we are run
out of memory.

As for "task_struct's rcu_read_lock_nesting". Will it be enough just
have a look at preempt_count of current process? If we have for example
nested rcu_read_locks:

<snip>
rcu_read_lock()
    rcu_read_lock()
        rcu_read_lock()
<snip>

the counter would be 3.

> 
> How much are we really losing by having an rcu_head in the structure,
> either separately or unioned over other fields?
> 
> > > > Also there is one more open question what to do if GFP_ATOMIC
> > > > gets failed in case of having low memory condition. Probably
> > > > we can make use of "mempool interface" that allows to have
> > > > min_nr guaranteed pre-allocated pages. 
> > > 
> > > But we really do still need to handle the case where everything runs out,
> > > even the pre-allocated pages.
> > 
> > If *everything* runs out, you are pretty much going to OOM sooner or later
> > anyway :D. But I see what you mean. But the 'tradeoff' is RCU can free
> > head-less objects where possible.
> 
> Would you rather pay an rcu_head tax (in cases where it cannot share
> with other fields), or would you rather have states where you could free
> a lot of memory if only there was some memory to allocate to track the
> memory to be freed?
> 
> But yes, as you suggested above, there could be an API similar to the
> userspace RCU library's API that usually just queues the callback but
> sometimes sleeps for a full grace period.  If enough people want this,
> it should not be hard to set up.
> 

--
Vlad Rezki
Paul E. McKenney Feb. 24, 2020, 11:14 p.m. UTC | #21
On Mon, Feb 24, 2020 at 06:02:19PM +0100, Uladzislau Rezki wrote:
> > > > > 
> > > > > Overall this implementation is nice. You are basically avoiding allocating
> > > > > rcu_head like Ted did by using the array-of-pointers technique we used for
> > > > > the previous kfree_rcu() work.
> > > > > 
> > > > > One thing stands out, the path where we could not allocate a page for the new
> > > > > block node:
> > > > > 
> > > > > > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > > > >         if (krcp->initialized)
> > > > > >                 spin_unlock(&krcp->lock);
> > > > > >         local_irq_restore(flags);
> > > > > > +
> > > > > > +       if (!skip_call_rcu) {
> > > > > > +               synchronize_rcu();
> > > > > > +               kvfree(ptr_to_free);
> > > > > 
> > > > > We can't block, it has to be async otherwise everything else blocks, and I
> > > > > think this can also be used from interrupt handlers which would at least be
> > > > > an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
> > > > > itself for the 'emergeny case' and use the regular techniques.
> > > > > 
> > > > > Another thing that stands out is the code duplication, if we can make this
> > > > > reuse as much as of the previous code as possible, that'd be great. I'd like
> > > > > to avoid bvcached and bvhead if possible. Maybe we can store information
> > > > > about the fact that this is a 'special object' in some of the lower-order
> > > > > bits of the pointer. Then we can detect that it is 'special' and free it
> > > > > using kvfree() during the reclaim
> > > > 
> > > > Basically what I did different is:
> > > > 1. Use the existing kfree_rcu_bulk_data::records array to store the
> > > >    to-be-freed array.
> > > > 2. In case of emergency, allocate a new wrapper and tag the pointer.
> > > >    Read the tag later to figure its an array wrapper and do additional kvfree.
> > > >
> > > I see your point and agree that duplication is odd and we should avoid
> > > it as much as possible. Also, i like the idea of using the wrapper as
> > > one more chance to build a "head" for headless object.
> > > 
> > > I did not mix pointers because then you will need to understand what is what.
> > 
> > Well that's why I brought up the whole tagging idea. Then you don't need
> > separate pointers to manage either (edit: but maybe you do as you mentioned
> > vfree below..).
> > 
> Right. We can use tagging idea to separate kmalloc/vmalloc pointers to
> place them into different arrays. Because kvmalloc() can return either
> SLAB pointer or vmalloc one.
> 
> > > It is OK for "emergency" path, because we simply can just serialize it by kvfree()
> > > call, it checks inside what the ptr address belong to:
> > > 
> > > <snip>
> > > void kvfree(const void *addr)
> > > {
> > >     if (is_vmalloc_addr(addr))
> > >         vfree(addr);
> > >     else
> > >         kfree(addr);
> > > }
> > > <snip>
> > > 
> > > whereas normal path, i mean "bulk one" where we store pointers into array
> > > would be broken. We can not call kfree_bulk(array, nr_entries) if the passed
> > > array contains "vmalloc" pointers, because it is different allocator. Therefore,
> > > i deliberately have made it as a special case.
> > 
> > Ok, it would be nice if you can verify that ptr_to_free passed to
> > kfree_call_rcu() is infact a vmalloc pointer.
> > 
> We can do that. We can check it by calling is_vmalloc_addr() on ptr. 
> So it is possible to differentiate.
> 
> > > > Perhaps the synchronize_rcu() should be done from a workqueue handler
> > > > to prevent IRQ crapping out?
> > > >
> > > I think so. For example one approach would be:
> > > 
> > > <snip>
> > > struct free_deferred {
> > >  struct llist_head list;
> > >  struct work_struct wq;
> > > };
> > > static DEFINE_PER_CPU(struct free_deferred, free_deferred);
> > > 
> > > static void free_work(struct work_struct *w)
> > > {
> > >   struct free_deferred *p = container_of(w, struct free_deferred, wq);
> > >   struct llist_node *t, *llnode;
> > > 
> > >   synchronize_rcu();
> > > 
> > >   llist_for_each_safe(llnode, t, llist_del_all(&p->list))
> > >      vfree((void *)llnode, 1);
> > > }
> > > 
> > > static inline void free_deferred_common(void *ptr_to_free)
> > > {
> > >     struct free_deferred *p = raw_cpu_ptr(&free_deferred);
> > > 
> > >     if (llist_add((struct llist_node *)ptr_to_free, &p->list))
> > 
> > Would this not corrupt the ptr_to_free pointer which readers might still be
> > accessing since grace period has not yet ended?
> > 
> > We cannot touch the ptr_to_free pointer until after the grace period has
> > ended.
> > 
> Right you are. We can do that only after grace period is passed, 
> after synchronize_rcu(). Good point :)
> 
> > > }
> > > <snip>
> > > 
> > > and it seems it should work. Because we know that KMALLOC_MIN_SIZE
> > > can not be less then machine word:
> > > 
> > > /*
> > >  * Kmalloc subsystem.
> > >  */
> > >  #ifndef KMALLOC_MIN_SIZE
> > >  #define KMALLOC_MIN_SIZE (1 << KMALLOC_SHIFT_LOW)
> > >  #endif
> > > 
> > > when it comes to vmalloc pointer it can not be less then one PAGE_SIZE :)
> > > 
> > > Another thing:
> > > 
> > > we are talking about "headless" variant that is special, therefore it
> > > implies to have some restrictions, since we need a dynamic memory to
> > > drive it. For example "headless" object can be freed from preemptible
> > > context only, because freeing can be inlined:
> > > 
> > > <snip>
> > > +   // NOT SURE if permitted due to IRQ. Maybe we
> > > +   // should try doing this from WQ?
> > > +   synchronize_rcu();
> > > +   kvfree(ptr);
> > > <snip>
> > > 
> > > Calling synchronize_rcu() from the IRQ context will screw the system up :)
> > > Because the current CPU will never pass the QS state if i do not miss something.
> > 
> > Yes are you right, calling synchronize_rcu() from IRQ context is a strict no-no.
> > 
> > I believe we could tap into the GFP_ATOMIC emergency memory pool for this
> > emergency situation. This pool is used for emergency cases. I think in
> > emergency we can grow an rcu_head on this pool.
> > 
> > > Also kvfree() itself can be called from the preemptible context only, excluding IRQ,
> > > there is a special path for it, otherwise vfree() can sleep. 
> > 
> > Ok that's good to know.
> > 
> > > > debug_objects bits wouldn't work obviously for the !emergency kvfree case,
> > > > not sure what we can do there.
> > > >
> > > Agree.
> > > 
> > > Thank you, Joel, for your comments!
> > 
> > No problem, I think we have a couple of ideas here.
> > 
> > What I also wanted to do was (may be after all this), see if we can create an
> > API for head-less kfree based on the same ideas. Not just for arrays for for
> > any object. Calling it, say, kfree_rcu_headless() and then use the bulk array
> > as we have been doing. That would save any users from having an rcu_head --
> > of course with all needed warnings about memory allocation failure. Vlad,
> > What do you think? Paul, any thoughts on this?
> > 
> I like it. It would be more clean interface. Also there are places where
> people do not embed the rcu_head into their stuctures for some reason
> and do like:
> 
> 
> <snip>
>     synchronize_rcu();
>     kfree(p);
> <snip>
> 
> <snip>
> urezki@pc636:~/data/ssd/coding/linux-rcu$ find ./ -name "*.c" | xargs grep -C 1 -rn "synchronize_rcu" | grep kfree
> ./arch/x86/mm/mmio-mod.c-314-           kfree(found_trace);
> ./kernel/module.c-3910- kfree(mod->args);
> ./kernel/trace/ftrace.c-5078-                   kfree(direct);
> ./kernel/trace/ftrace.c-5155-                   kfree(direct);
> ./kernel/trace/trace_probe.c-1087-      kfree(link);
> ./fs/nfs/sysfs.c-113-           kfree(old);
> ./fs/ext4/super.c-1701- kfree(old_qname);
> ./net/ipv4/gre.mod.c-36-        { 0xfc3fcca2, "kfree_skb" },
> ./net/core/sysctl_net_core.c-143-                               kfree(cur);
> ./drivers/crypto/nx/nx-842-pseries.c-1010-      kfree(old_devdata);
> ./drivers/misc/vmw_vmci/vmci_context.c-692-             kfree(notifier);
> ./drivers/misc/vmw_vmci/vmci_event.c-213-       kfree(s);
> ./drivers/infiniband/core/device.c:2162:                         * synchronize_rcu before the netdev is kfreed, so we
> ./drivers/infiniband/hw/hfi1/sdma.c-1337-       kfree(dd->per_sdma);
> ./drivers/net/ethernet/myricom/myri10ge/myri10ge.c-3582-        kfree(mgp->ss);
> ./drivers/net/ethernet/myricom/myri10ge/myri10ge.mod.c-156-     { 0x37a0cba, "kfree" },
> ./drivers/net/ethernet/mellanox/mlx5/core/fpga/tls.c:286:       synchronize_rcu(); /* before kfree(flow) */
> ./drivers/net/ethernet/mellanox/mlxsw/core.c-1504-      kfree(rxl_item);
> ./drivers/net/ethernet/chelsio/cxgb4/cxgb4_main.c-6648- kfree(adapter->mbox_log);
> ./drivers/net/ethernet/chelsio/cxgb4/cxgb4_main.c-6650- kfree(adapter);
> ./drivers/block/drbd/drbd_receiver.c-3804-      kfree(old_net_conf);
> ./drivers/block/drbd/drbd_receiver.c-4176-                      kfree(old_disk_conf);
> ./drivers/block/drbd/drbd_state.c-2074-         kfree(old_conf);
> ./drivers/block/drbd/drbd_nl.c-1689-    kfree(old_disk_conf);
> ./drivers/block/drbd/drbd_nl.c-2522-    kfree(old_net_conf);
> ./drivers/block/drbd/drbd_nl.c-2935-            kfree(old_disk_conf);
> ./drivers/mfd/dln2.c-178-               kfree(i);
> ./drivers/staging/fwserial/fwserial.c-2122-     kfree(peer);
> <snip>

That certainly is enough use cases to justify a new API.  And given that
they are already calling synchronize_rcu(), having a function that only
sometimes calls synchronize_rcu() should be OK from a latency viewpoint.

The trick will be verifying that the existing calls to synchronize_rcu()
aren't providing some other guarantee...  But submitting the patch and
seeing if the maintainers are willing to ack is not without merit.

Plus, the cases where both the synchronize_rcu() and the kfree() are
under some "if" should be easier to verify as safe.

							Thanx, Paul
Joel Fernandes Feb. 25, 2020, 1:48 a.m. UTC | #22
[..]
> > > > debug_objects bits wouldn't work obviously for the !emergency kvfree case,
> > > > not sure what we can do there.
> > > >
> > > Agree.
> > > 
> > > Thank you, Joel, for your comments!
> > 
> > No problem, I think we have a couple of ideas here.
> > 
> > What I also wanted to do was (may be after all this), see if we can create an
> > API for head-less kfree based on the same ideas. Not just for arrays for for
> > any object. Calling it, say, kfree_rcu_headless() and then use the bulk array
> > as we have been doing. That would save any users from having an rcu_head --
> > of course with all needed warnings about memory allocation failure. Vlad,
> > What do you think? Paul, any thoughts on this?
> > 
> I like it. It would be more clean interface. Also there are places where
> people do not embed the rcu_head into their stuctures for some reason
> and do like:
> 
> 
> <snip>
>     synchronize_rcu();
>     kfree(p);
> <snip>
> 
> <snip>
> urezki@pc636:~/data/ssd/coding/linux-rcu$ find ./ -name "*.c" | xargs grep -C 1 -rn "synchronize_rcu" | grep kfree
> ./arch/x86/mm/mmio-mod.c-314-           kfree(found_trace);
> ./kernel/module.c-3910- kfree(mod->args);
> ./kernel/trace/ftrace.c-5078-                   kfree(direct);
> ./kernel/trace/ftrace.c-5155-                   kfree(direct);
> ./kernel/trace/trace_probe.c-1087-      kfree(link);
> ./fs/nfs/sysfs.c-113-           kfree(old);
> ./fs/ext4/super.c-1701- kfree(old_qname);
> ./net/ipv4/gre.mod.c-36-        { 0xfc3fcca2, "kfree_skb" },
> ./net/core/sysctl_net_core.c-143-                               kfree(cur);
> ./drivers/crypto/nx/nx-842-pseries.c-1010-      kfree(old_devdata);
> ./drivers/misc/vmw_vmci/vmci_context.c-692-             kfree(notifier);
> ./drivers/misc/vmw_vmci/vmci_event.c-213-       kfree(s);
> ./drivers/infiniband/core/device.c:2162:                         * synchronize_rcu before the netdev is kfreed, so we
> ./drivers/infiniband/hw/hfi1/sdma.c-1337-       kfree(dd->per_sdma);
> ./drivers/net/ethernet/myricom/myri10ge/myri10ge.c-3582-        kfree(mgp->ss);
> ./drivers/net/ethernet/myricom/myri10ge/myri10ge.mod.c-156-     { 0x37a0cba, "kfree" },
> ./drivers/net/ethernet/mellanox/mlx5/core/fpga/tls.c:286:       synchronize_rcu(); /* before kfree(flow) */
> ./drivers/net/ethernet/mellanox/mlxsw/core.c-1504-      kfree(rxl_item);
> ./drivers/net/ethernet/chelsio/cxgb4/cxgb4_main.c-6648- kfree(adapter->mbox_log);
> ./drivers/net/ethernet/chelsio/cxgb4/cxgb4_main.c-6650- kfree(adapter);
> ./drivers/block/drbd/drbd_receiver.c-3804-      kfree(old_net_conf);
> ./drivers/block/drbd/drbd_receiver.c-4176-                      kfree(old_disk_conf);
> ./drivers/block/drbd/drbd_state.c-2074-         kfree(old_conf);
> ./drivers/block/drbd/drbd_nl.c-1689-    kfree(old_disk_conf);
> ./drivers/block/drbd/drbd_nl.c-2522-    kfree(old_net_conf);
> ./drivers/block/drbd/drbd_nl.c-2935-            kfree(old_disk_conf);
> ./drivers/mfd/dln2.c-178-               kfree(i);
> ./drivers/staging/fwserial/fwserial.c-2122-     kfree(peer);
> <snip>

Wow that's pretty amazing, looks like could be very useful.

Do you want to continue your patch and then post it, and we can discuss it?
Or happy to take it as well.

We could split work into 3 parts:
1. Make changes for 2 separate per-CPU arrays. One for vfree and one for kfree.
2. Add headless support for both as alternative API.
3. Handle the low memory case somehow (I'll reply to the other thread).

May be we can start with 1. where you can clean up your patch and post, and
then I/we can work based on that?

thanks,

 - Joel
Joel Fernandes Feb. 25, 2020, 2:07 a.m. UTC | #23
On Mon, Feb 24, 2020 at 06:40:30PM +0100, Uladzislau Rezki wrote:
> On Sat, Feb 22, 2020 at 05:10:18PM -0800, Paul E. McKenney wrote:
> > On Sat, Feb 22, 2020 at 05:24:15PM -0500, Joel Fernandes wrote:
> > > On Fri, Feb 21, 2020 at 12:22:50PM -0800, Paul E. McKenney wrote:
> > > > On Fri, Feb 21, 2020 at 02:14:55PM +0100, Uladzislau Rezki wrote:
> > > > > On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> > > > > > On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > > > > > > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > > > > > > now it becomes possible to use it like: 
> > > > > > > > 	...
> > > > > > > > 	void *p = kvmalloc(PAGE_SIZE);
> > > > > > > > 	kvfree_rcu(p);
> > > > > > > > 	...
> > > > > > > > also have a look at the example in the mm/list_lru.c diff.
> > > > > > > 
> > > > > > > I certainly like the interface, thanks!  I'm going to be pushing
> > > > > > > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > > > > > > number of bugs in ext4's online resizing which appear to be hitting
> > > > > > > multiple cloud providers (with reports from both AWS and GCP) and I
> > > > > > > want something which can be easily backported to stable kernels.
> > > > > > > 
> > > > > > > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > > > > > > your kvfree_rcu() is definitely more efficient than my expedient
> > > > > > > jury-rig.
> > > > > > > 
> > > > > > > I don't feel entirely competent to review the implementation, but I do
> > > > > > > have one question.  It looks like the rcutiny implementation of
> > > > > > > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > > > > > > Am I missing something?
> > > > > > 
> > > > > > Good catch!  I believe that rcu_reclaim_tiny() would need to do
> > > > > > kvfree() instead of its current kfree().
> > > > > > 
> > > > > > Vlad, anything I am missing here?
> > > > > >
> > > > > Yes something like that. There are some open questions about
> > > > > realization, when it comes to tiny RCU. Since we are talking
> > > > > about "headless" kvfree_rcu() interface, i mean we can not link
> > > > > freed "objects" between each other, instead we should place a
> > > > > pointer directly into array that will be drained later on.
> > > > > 
> > > > > It would be much more easier to achieve that if we were talking
> > > > > about the interface like: kvfree_rcu(p, rcu), but that is not our
> > > > > case :)
> > > > > 
> > > > > So, for CONFIG_TINY_RCU we should implement very similar what we
> > > > > have done for CONFIG_TREE_RCU or just simply do like Ted has done
> > > > > with his
> > > > > 
> > > > > void ext4_kvfree_array_rcu(void *to_free)
> > > > > 
> > > > > i mean:
> > > > > 
> > > > >    local_irq_save(flags);
> > > > >    struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);
> > > > > 
> > > > >    if (ptr) {
> > > > >            ptr->ptr = to_free;
> > > > >            call_rcu(&ptr->rcu, kvfree_callback);
> > > > >    }
> > > > >    local_irq_restore(flags);
> > > > 
> > > > We really do still need the emergency case, in this case for when
> > > > kzalloc() returns NULL.  Which does indeed mean an rcu_head in the thing
> > > > being freed.  Otherwise, you end up with an out-of-memory deadlock where
> > > > you could free memory only if you had memor to allocate.
> > > 
> > > Can we rely on GFP_ATOMIC allocations for these? These have emergency memory
> > > pools which are reserved.
> > 
> > You can, at least until the emergency memory pools are exhausted.
> > 
> > > I was thinking a 2 fold approach (just thinking out loud..):
> > > 
> > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > queue that.
> > > 
> I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> gets failed in atomic context? Or we can just consider it as out of
> memory and another variant is to say that headless object can be called
> from preemptible context only.

Yes that makes sense, and we can always put disclaimer in the API's comments
saying if this object is expected to be freed a lot, then don't use the
headless-API to be extra safe.

BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
there seems to be bigger problems in the system any way. I would say let us
write a patch to allocate there and see what the -mm guys think.

> > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > synchronize_rcu() inline with it.
> > > 
> > >
> What do you mean here, Joel? "grow an rcu_head on the stack"?

By "grow on the stack", use the compiler-allocated rcu_head on the
kfree_rcu() caller's stack.

I meant here to say, if we are not in atomic context, then we use regular
GFP_KERNEL allocation, and if that fails, then we just use the stack's
rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
the allocation failure would mean the need for RCU to free some memory is
probably great.

> > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > between the 2 cases.
> > > 
> If the current context is preemptable then we can inline synchronize_rcu()
> together with freeing to handle such corner case, i mean when we are run
> out of memory.

Ah yes, exactly what I mean.

> As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> have a look at preempt_count of current process? If we have for example
> nested rcu_read_locks:
> 
> <snip>
> rcu_read_lock()
>     rcu_read_lock()
>         rcu_read_lock()
> <snip>
> 
> the counter would be 3.

No, because preempt_count is not incremented during rcu_read_lock(). RCU
reader sections can be preempted, they just cannot goto sleep in a reader
section (unless the kernel is RT).

thanks,

 - Joel
Joel Fernandes Feb. 25, 2020, 2:11 a.m. UTC | #24
On Sat, Feb 22, 2020 at 05:10:18PM -0800, Paul E. McKenney wrote:
[...] 
> > I was thinking a 2 fold approach (just thinking out loud..):
> > 
> > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > queue that.
> > 
> > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > synchronize_rcu() inline with it.
> > 
> > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > between the 2 cases.
> > 
> > Thoughts?
> 
> How much are we really losing by having an rcu_head in the structure,
> either separately or unioned over other fields?

It does seem a convenient API at first glance. Also seems like there are a
number of usecases now (ext4, vfree that Vlad mentioned, and all the other
users he mentioned, etc).

> > > > Also there is one more open question what to do if GFP_ATOMIC
> > > > gets failed in case of having low memory condition. Probably
> > > > we can make use of "mempool interface" that allows to have
> > > > min_nr guaranteed pre-allocated pages. 
> > > 
> > > But we really do still need to handle the case where everything runs out,
> > > even the pre-allocated pages.
> > 
> > If *everything* runs out, you are pretty much going to OOM sooner or later
> > anyway :D. But I see what you mean. But the 'tradeoff' is RCU can free
> > head-less objects where possible.
> 
> Would you rather pay an rcu_head tax (in cases where it cannot share
> with other fields), or would you rather have states where you could free
> a lot of memory if only there was some memory to allocate to track the
> memory to be freed?

Depends on the usecase we could use the right API.

> But yes, as you suggested above, there could be an API similar to the
> userspace RCU library's API that usually just queues the callback but
> sometimes sleeps for a full grace period.  If enough people want this,
> it should not be hard to set up.

Sounds good!

thanks,

 - Joel
Paul E. McKenney Feb. 25, 2020, 3:55 a.m. UTC | #25
On Mon, Feb 24, 2020 at 09:07:05PM -0500, Joel Fernandes wrote:
> On Mon, Feb 24, 2020 at 06:40:30PM +0100, Uladzislau Rezki wrote:
> > On Sat, Feb 22, 2020 at 05:10:18PM -0800, Paul E. McKenney wrote:
> > > On Sat, Feb 22, 2020 at 05:24:15PM -0500, Joel Fernandes wrote:
> > > > On Fri, Feb 21, 2020 at 12:22:50PM -0800, Paul E. McKenney wrote:
> > > > > On Fri, Feb 21, 2020 at 02:14:55PM +0100, Uladzislau Rezki wrote:
> > > > > > On Thu, Feb 20, 2020 at 04:30:35PM -0800, Paul E. McKenney wrote:
> > > > > > > On Wed, Feb 19, 2020 at 11:52:33PM -0500, Theodore Y. Ts'o wrote:
> > > > > > > > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > > > > > > > > now it becomes possible to use it like: 
> > > > > > > > > 	...
> > > > > > > > > 	void *p = kvmalloc(PAGE_SIZE);
> > > > > > > > > 	kvfree_rcu(p);
> > > > > > > > > 	...
> > > > > > > > > also have a look at the example in the mm/list_lru.c diff.
> > > > > > > > 
> > > > > > > > I certainly like the interface, thanks!  I'm going to be pushing
> > > > > > > > patches to fix this using ext4_kvfree_array_rcu() since there are a
> > > > > > > > number of bugs in ext4's online resizing which appear to be hitting
> > > > > > > > multiple cloud providers (with reports from both AWS and GCP) and I
> > > > > > > > want something which can be easily backported to stable kernels.
> > > > > > > > 
> > > > > > > > But once kvfree_rcu() hits mainline, I'll switch ext4 to use it, since
> > > > > > > > your kvfree_rcu() is definitely more efficient than my expedient
> > > > > > > > jury-rig.
> > > > > > > > 
> > > > > > > > I don't feel entirely competent to review the implementation, but I do
> > > > > > > > have one question.  It looks like the rcutiny implementation of
> > > > > > > > kfree_call_rcu() isn't going to do the right thing with kvfree_rcu(p).
> > > > > > > > Am I missing something?
> > > > > > > 
> > > > > > > Good catch!  I believe that rcu_reclaim_tiny() would need to do
> > > > > > > kvfree() instead of its current kfree().
> > > > > > > 
> > > > > > > Vlad, anything I am missing here?
> > > > > > >
> > > > > > Yes something like that. There are some open questions about
> > > > > > realization, when it comes to tiny RCU. Since we are talking
> > > > > > about "headless" kvfree_rcu() interface, i mean we can not link
> > > > > > freed "objects" between each other, instead we should place a
> > > > > > pointer directly into array that will be drained later on.
> > > > > > 
> > > > > > It would be much more easier to achieve that if we were talking
> > > > > > about the interface like: kvfree_rcu(p, rcu), but that is not our
> > > > > > case :)
> > > > > > 
> > > > > > So, for CONFIG_TINY_RCU we should implement very similar what we
> > > > > > have done for CONFIG_TREE_RCU or just simply do like Ted has done
> > > > > > with his
> > > > > > 
> > > > > > void ext4_kvfree_array_rcu(void *to_free)
> > > > > > 
> > > > > > i mean:
> > > > > > 
> > > > > >    local_irq_save(flags);
> > > > > >    struct foo *ptr = kzalloc(sizeof(*ptr), GFP_ATOMIC);
> > > > > > 
> > > > > >    if (ptr) {
> > > > > >            ptr->ptr = to_free;
> > > > > >            call_rcu(&ptr->rcu, kvfree_callback);
> > > > > >    }
> > > > > >    local_irq_restore(flags);
> > > > > 
> > > > > We really do still need the emergency case, in this case for when
> > > > > kzalloc() returns NULL.  Which does indeed mean an rcu_head in the thing
> > > > > being freed.  Otherwise, you end up with an out-of-memory deadlock where
> > > > > you could free memory only if you had memor to allocate.
> > > > 
> > > > Can we rely on GFP_ATOMIC allocations for these? These have emergency memory
> > > > pools which are reserved.
> > > 
> > > You can, at least until the emergency memory pools are exhausted.
> > > 
> > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > 
> > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > queue that.
> > > > 
> > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > gets failed in atomic context? Or we can just consider it as out of
> > memory and another variant is to say that headless object can be called
> > from preemptible context only.
> 
> Yes that makes sense, and we can always put disclaimer in the API's comments
> saying if this object is expected to be freed a lot, then don't use the
> headless-API to be extra safe.
> 
> BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> there seems to be bigger problems in the system any way. I would say let us
> write a patch to allocate there and see what the -mm guys think.
> 
> > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > synchronize_rcu() inline with it.
> > > > 
> > > >
> > What do you mean here, Joel? "grow an rcu_head on the stack"?
> 
> By "grow on the stack", use the compiler-allocated rcu_head on the
> kfree_rcu() caller's stack.
> 
> I meant here to say, if we are not in atomic context, then we use regular
> GFP_KERNEL allocation, and if that fails, then we just use the stack's
> rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> the allocation failure would mean the need for RCU to free some memory is
> probably great.
> 
> > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > between the 2 cases.
> > > > 
> > If the current context is preemptable then we can inline synchronize_rcu()
> > together with freeing to handle such corner case, i mean when we are run
> > out of memory.
> 
> Ah yes, exactly what I mean.
> 
> > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > have a look at preempt_count of current process? If we have for example
> > nested rcu_read_locks:
> > 
> > <snip>
> > rcu_read_lock()
> >     rcu_read_lock()
> >         rcu_read_lock()
> > <snip>
> > 
> > the counter would be 3.
> 
> No, because preempt_count is not incremented during rcu_read_lock(). RCU
> reader sections can be preempted, they just cannot goto sleep in a reader
> section (unless the kernel is RT).

You are both right.

Vlad is correct for CONFIG_PREEMPT=n and CONFIG_DEBUG_ATOMIC_SLEEP=y
and Joel is correct otherwise, give or take the possibility of other
late-breaking corner cases.  ;-)

							Thanx, Paul
Joel Fernandes Feb. 25, 2020, 2:17 p.m. UTC | #26
On Mon, Feb 24, 2020 at 10:55 PM Paul E. McKenney <paulmck@kernel.org> wrote:
[...]
> > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > have a look at preempt_count of current process? If we have for example
> > > nested rcu_read_locks:
> > >
> > > <snip>
> > > rcu_read_lock()
> > >     rcu_read_lock()
> > >         rcu_read_lock()
> > > <snip>
> > >
> > > the counter would be 3.
> >
> > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > reader sections can be preempted, they just cannot goto sleep in a reader
> > section (unless the kernel is RT).
>
> You are both right.
>
> Vlad is correct for CONFIG_PREEMPT=n and CONFIG_DEBUG_ATOMIC_SLEEP=y
> and Joel is correct otherwise, give or take the possibility of other
> late-breaking corner cases.  ;-)

Oh yes, but even for PREEMPT=n, rcu_read_lock() is just a NOOP for
that configuration and doesn't really mess around with preempt_count
if I recall :-D. (doesn't need to mess with preempt_count because
being in kernel mode is non-preemptible for PREEMPT=n anyway).

thanks,

- Joel
Paul E. McKenney Feb. 25, 2020, 4:38 p.m. UTC | #27
On Tue, Feb 25, 2020 at 09:17:11AM -0500, Joel Fernandes wrote:
> On Mon, Feb 24, 2020 at 10:55 PM Paul E. McKenney <paulmck@kernel.org> wrote:
> [...]
> > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > have a look at preempt_count of current process? If we have for example
> > > > nested rcu_read_locks:
> > > >
> > > > <snip>
> > > > rcu_read_lock()
> > > >     rcu_read_lock()
> > > >         rcu_read_lock()
> > > > <snip>
> > > >
> > > > the counter would be 3.
> > >
> > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > section (unless the kernel is RT).
> >
> > You are both right.
> >
> > Vlad is correct for CONFIG_PREEMPT=n and CONFIG_DEBUG_ATOMIC_SLEEP=y
> > and Joel is correct otherwise, give or take the possibility of other
> > late-breaking corner cases.  ;-)
> 
> Oh yes, but even for PREEMPT=n, rcu_read_lock() is just a NOOP for
> that configuration and doesn't really mess around with preempt_count
> if I recall :-D. (doesn't need to mess with preempt_count because
> being in kernel mode is non-preemptible for PREEMPT=n anyway).

For PREEMPT=n, rcu_read_lock() is preempt_disable(), see the code
in include/linux/rcupdate.h.  ;-)

							Thanx, Paul
Joel Fernandes Feb. 25, 2020, 5 p.m. UTC | #28
On Tue, Feb 25, 2020 at 11:42 AM Paul E. McKenney <paulmck@kernel.org> wrote:
>
> On Tue, Feb 25, 2020 at 09:17:11AM -0500, Joel Fernandes wrote:
> > On Mon, Feb 24, 2020 at 10:55 PM Paul E. McKenney <paulmck@kernel.org> wrote:
> > [...]
> > > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > > have a look at preempt_count of current process? If we have for example
> > > > > nested rcu_read_locks:
> > > > >
> > > > > <snip>
> > > > > rcu_read_lock()
> > > > >     rcu_read_lock()
> > > > >         rcu_read_lock()
> > > > > <snip>
> > > > >
> > > > > the counter would be 3.
> > > >
> > > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > > section (unless the kernel is RT).
> > >
> > > You are both right.
> > >
> > > Vlad is correct for CONFIG_PREEMPT=n and CONFIG_DEBUG_ATOMIC_SLEEP=y
> > > and Joel is correct otherwise, give or take the possibility of other
> > > late-breaking corner cases.  ;-)
> >
> > Oh yes, but even for PREEMPT=n, rcu_read_lock() is just a NOOP for
> > that configuration and doesn't really mess around with preempt_count
> > if I recall :-D. (doesn't need to mess with preempt_count because
> > being in kernel mode is non-preemptible for PREEMPT=n anyway).
>
> For PREEMPT=n, rcu_read_lock() is preempt_disable(), see the code
> in include/linux/rcupdate.h.  ;-)

Paul....

;-) ;-)

thanks,

 - Joel
Uladzislau Rezki Feb. 25, 2020, 6:54 p.m. UTC | #29
> > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > 
> > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > queue that.
> > > > 
> > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > gets failed in atomic context? Or we can just consider it as out of
> > memory and another variant is to say that headless object can be called
> > from preemptible context only.
> 
> Yes that makes sense, and we can always put disclaimer in the API's comments
> saying if this object is expected to be freed a lot, then don't use the
> headless-API to be extra safe.
> 
Agree.

> BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> there seems to be bigger problems in the system any way. I would say let us
> write a patch to allocate there and see what the -mm guys think.
> 
OK. It might be that they can offer something if they do not like our
approach. I will try to compose something and send the patch to see.
The tree.c implementation is almost done, whereas tiny one is on hold.

I think we should support batching as well as bulk interface there.
Another way is to workaround head-less object, just to attach the head
dynamically using kmalloc() and then call_rcu() but then it will not be
a fair headless support :)

What is your view?

> > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > synchronize_rcu() inline with it.
> > > > 
> > > >
> > What do you mean here, Joel? "grow an rcu_head on the stack"?
> 
> By "grow on the stack", use the compiler-allocated rcu_head on the
> kfree_rcu() caller's stack.
> 
> I meant here to say, if we are not in atomic context, then we use regular
> GFP_KERNEL allocation, and if that fails, then we just use the stack's
> rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> the allocation failure would mean the need for RCU to free some memory is
> probably great.
> 
Ah, i got it. I thought you meant something like recursion and then
unwinding the stack back somehow :)

> > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > between the 2 cases.
> > > > 
> > If the current context is preemptable then we can inline synchronize_rcu()
> > together with freeing to handle such corner case, i mean when we are run
> > out of memory.
> 
> Ah yes, exactly what I mean.
> 
OK.

> > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > have a look at preempt_count of current process? If we have for example
> > nested rcu_read_locks:
> > 
> > <snip>
> > rcu_read_lock()
> >     rcu_read_lock()
> >         rcu_read_lock()
> > <snip>
> > 
> > the counter would be 3.
> 
> No, because preempt_count is not incremented during rcu_read_lock(). RCU
> reader sections can be preempted, they just cannot goto sleep in a reader
> section (unless the kernel is RT).
> 
So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
then we skip it and consider as atomic. Something like:

<snip>
static bool is_current_in_atomic()
{
#ifdef CONFIG_PREEMPT_RCU
    if (!rcu_preempt_depth() && !in_atomic())
        return false;
#endif

    return true;
}
<snip>

Thanks!

--
Vlad Rezki
Paul E. McKenney Feb. 25, 2020, 10:47 p.m. UTC | #30
On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > 
> > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > queue that.
> > > > > 
> > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > gets failed in atomic context? Or we can just consider it as out of
> > > memory and another variant is to say that headless object can be called
> > > from preemptible context only.
> > 
> > Yes that makes sense, and we can always put disclaimer in the API's comments
> > saying if this object is expected to be freed a lot, then don't use the
> > headless-API to be extra safe.
> > 
> Agree.
> 
> > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > there seems to be bigger problems in the system any way. I would say let us
> > write a patch to allocate there and see what the -mm guys think.
> > 
> OK. It might be that they can offer something if they do not like our
> approach. I will try to compose something and send the patch to see.
> The tree.c implementation is almost done, whereas tiny one is on hold.
> 
> I think we should support batching as well as bulk interface there.
> Another way is to workaround head-less object, just to attach the head
> dynamically using kmalloc() and then call_rcu() but then it will not be
> a fair headless support :)
> 
> What is your view?
> 
> > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > synchronize_rcu() inline with it.
> > > > > 
> > > > >
> > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > 
> > By "grow on the stack", use the compiler-allocated rcu_head on the
> > kfree_rcu() caller's stack.
> > 
> > I meant here to say, if we are not in atomic context, then we use regular
> > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > the allocation failure would mean the need for RCU to free some memory is
> > probably great.
> > 
> Ah, i got it. I thought you meant something like recursion and then
> unwinding the stack back somehow :)
> 
> > > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > > between the 2 cases.
> > > > > 
> > > If the current context is preemptable then we can inline synchronize_rcu()
> > > together with freeing to handle such corner case, i mean when we are run
> > > out of memory.
> > 
> > Ah yes, exactly what I mean.
> > 
> OK.
> 
> > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > have a look at preempt_count of current process? If we have for example
> > > nested rcu_read_locks:
> > > 
> > > <snip>
> > > rcu_read_lock()
> > >     rcu_read_lock()
> > >         rcu_read_lock()
> > > <snip>
> > > 
> > > the counter would be 3.
> > 
> > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > reader sections can be preempted, they just cannot goto sleep in a reader
> > section (unless the kernel is RT).
> > 
> So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> then we skip it and consider as atomic. Something like:
> 
> <snip>
> static bool is_current_in_atomic()
> {
> #ifdef CONFIG_PREEMPT_RCU

If possible: if (IS_ENABLED(CONFIG_PREEMPT_RCU))

Much nicer than #ifdef, and I -think- it should work in this case.

							Thanx, Paul

>     if (!rcu_preempt_depth() && !in_atomic())
>         return false;
> #endif
> 
>     return true;
> }
> <snip>
> 
> Thanks!
> 
> --
> Vlad Rezki
Uladzislau Rezki Feb. 26, 2020, 1:04 p.m. UTC | #31
On Tue, Feb 25, 2020 at 02:47:45PM -0800, Paul E. McKenney wrote:
> On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > > 
> > > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > > queue that.
> > > > > > 
> > > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > > gets failed in atomic context? Or we can just consider it as out of
> > > > memory and another variant is to say that headless object can be called
> > > > from preemptible context only.
> > > 
> > > Yes that makes sense, and we can always put disclaimer in the API's comments
> > > saying if this object is expected to be freed a lot, then don't use the
> > > headless-API to be extra safe.
> > > 
> > Agree.
> > 
> > > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > > there seems to be bigger problems in the system any way. I would say let us
> > > write a patch to allocate there and see what the -mm guys think.
> > > 
> > OK. It might be that they can offer something if they do not like our
> > approach. I will try to compose something and send the patch to see.
> > The tree.c implementation is almost done, whereas tiny one is on hold.
> > 
> > I think we should support batching as well as bulk interface there.
> > Another way is to workaround head-less object, just to attach the head
> > dynamically using kmalloc() and then call_rcu() but then it will not be
> > a fair headless support :)
> > 
> > What is your view?
> > 
> > > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > > synchronize_rcu() inline with it.
> > > > > > 
> > > > > >
> > > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > > 
> > > By "grow on the stack", use the compiler-allocated rcu_head on the
> > > kfree_rcu() caller's stack.
> > > 
> > > I meant here to say, if we are not in atomic context, then we use regular
> > > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > > the allocation failure would mean the need for RCU to free some memory is
> > > probably great.
> > > 
> > Ah, i got it. I thought you meant something like recursion and then
> > unwinding the stack back somehow :)
> > 
> > > > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > > > between the 2 cases.
> > > > > > 
> > > > If the current context is preemptable then we can inline synchronize_rcu()
> > > > together with freeing to handle such corner case, i mean when we are run
> > > > out of memory.
> > > 
> > > Ah yes, exactly what I mean.
> > > 
> > OK.
> > 
> > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > have a look at preempt_count of current process? If we have for example
> > > > nested rcu_read_locks:
> > > > 
> > > > <snip>
> > > > rcu_read_lock()
> > > >     rcu_read_lock()
> > > >         rcu_read_lock()
> > > > <snip>
> > > > 
> > > > the counter would be 3.
> > > 
> > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > section (unless the kernel is RT).
> > > 
> > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > then we skip it and consider as atomic. Something like:
> > 
> > <snip>
> > static bool is_current_in_atomic()
> > {
> > #ifdef CONFIG_PREEMPT_RCU
> 
> If possible: if (IS_ENABLED(CONFIG_PREEMPT_RCU))
> 
> Much nicer than #ifdef, and I -think- it should work in this case.
> 
OK. Thank you, Paul!

There is one point i would like to highlight it is about making caller
instead to be responsible for atomic or not decision. Like how kmalloc()
works, it does not really know the context it runs on, so it is up to
caller to inform.

The same way:

kvfree_rcu(p, atomic = true/false);

in this case we could cover !CONFIG_PREEMPT case also.

--
Vlad Rezki
Paul E. McKenney Feb. 26, 2020, 3:06 p.m. UTC | #32
On Wed, Feb 26, 2020 at 02:04:40PM +0100, Uladzislau Rezki wrote:
> On Tue, Feb 25, 2020 at 02:47:45PM -0800, Paul E. McKenney wrote:
> > On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > > > 
> > > > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > > > queue that.
> > > > > > > 
> > > > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > > > gets failed in atomic context? Or we can just consider it as out of
> > > > > memory and another variant is to say that headless object can be called
> > > > > from preemptible context only.
> > > > 
> > > > Yes that makes sense, and we can always put disclaimer in the API's comments
> > > > saying if this object is expected to be freed a lot, then don't use the
> > > > headless-API to be extra safe.
> > > > 
> > > Agree.
> > > 
> > > > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > > > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > > > there seems to be bigger problems in the system any way. I would say let us
> > > > write a patch to allocate there and see what the -mm guys think.
> > > > 
> > > OK. It might be that they can offer something if they do not like our
> > > approach. I will try to compose something and send the patch to see.
> > > The tree.c implementation is almost done, whereas tiny one is on hold.
> > > 
> > > I think we should support batching as well as bulk interface there.
> > > Another way is to workaround head-less object, just to attach the head
> > > dynamically using kmalloc() and then call_rcu() but then it will not be
> > > a fair headless support :)
> > > 
> > > What is your view?
> > > 
> > > > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > > > synchronize_rcu() inline with it.
> > > > > > > 
> > > > > > >
> > > > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > > > 
> > > > By "grow on the stack", use the compiler-allocated rcu_head on the
> > > > kfree_rcu() caller's stack.
> > > > 
> > > > I meant here to say, if we are not in atomic context, then we use regular
> > > > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > > > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > > > the allocation failure would mean the need for RCU to free some memory is
> > > > probably great.
> > > > 
> > > Ah, i got it. I thought you meant something like recursion and then
> > > unwinding the stack back somehow :)
> > > 
> > > > > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > > > > between the 2 cases.
> > > > > > > 
> > > > > If the current context is preemptable then we can inline synchronize_rcu()
> > > > > together with freeing to handle such corner case, i mean when we are run
> > > > > out of memory.
> > > > 
> > > > Ah yes, exactly what I mean.
> > > > 
> > > OK.
> > > 
> > > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > > have a look at preempt_count of current process? If we have for example
> > > > > nested rcu_read_locks:
> > > > > 
> > > > > <snip>
> > > > > rcu_read_lock()
> > > > >     rcu_read_lock()
> > > > >         rcu_read_lock()
> > > > > <snip>
> > > > > 
> > > > > the counter would be 3.
> > > > 
> > > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > > section (unless the kernel is RT).
> > > > 
> > > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > > then we skip it and consider as atomic. Something like:
> > > 
> > > <snip>
> > > static bool is_current_in_atomic()
> > > {
> > > #ifdef CONFIG_PREEMPT_RCU
> > 
> > If possible: if (IS_ENABLED(CONFIG_PREEMPT_RCU))
> > 
> > Much nicer than #ifdef, and I -think- it should work in this case.
> > 
> OK. Thank you, Paul!
> 
> There is one point i would like to highlight it is about making caller
> instead to be responsible for atomic or not decision. Like how kmalloc()
> works, it does not really know the context it runs on, so it is up to
> caller to inform.
> 
> The same way:
> 
> kvfree_rcu(p, atomic = true/false);
> 
> in this case we could cover !CONFIG_PREEMPT case also.

Understood, but couldn't we instead use IS_ENABLED() to work out the
actual situation at runtime and relieve the caller of this burden?
Or am I missing a corner case?

							Thanx, Paul
Uladzislau Rezki Feb. 26, 2020, 3:53 p.m. UTC | #33
On Wed, Feb 26, 2020 at 07:06:56AM -0800, Paul E. McKenney wrote:
> On Wed, Feb 26, 2020 at 02:04:40PM +0100, Uladzislau Rezki wrote:
> > On Tue, Feb 25, 2020 at 02:47:45PM -0800, Paul E. McKenney wrote:
> > > On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > > > > 
> > > > > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > > > > queue that.
> > > > > > > > 
> > > > > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > > > > gets failed in atomic context? Or we can just consider it as out of
> > > > > > memory and another variant is to say that headless object can be called
> > > > > > from preemptible context only.
> > > > > 
> > > > > Yes that makes sense, and we can always put disclaimer in the API's comments
> > > > > saying if this object is expected to be freed a lot, then don't use the
> > > > > headless-API to be extra safe.
> > > > > 
> > > > Agree.
> > > > 
> > > > > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > > > > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > > > > there seems to be bigger problems in the system any way. I would say let us
> > > > > write a patch to allocate there and see what the -mm guys think.
> > > > > 
> > > > OK. It might be that they can offer something if they do not like our
> > > > approach. I will try to compose something and send the patch to see.
> > > > The tree.c implementation is almost done, whereas tiny one is on hold.
> > > > 
> > > > I think we should support batching as well as bulk interface there.
> > > > Another way is to workaround head-less object, just to attach the head
> > > > dynamically using kmalloc() and then call_rcu() but then it will not be
> > > > a fair headless support :)
> > > > 
> > > > What is your view?
> > > > 
> > > > > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > > > > synchronize_rcu() inline with it.
> > > > > > > > 
> > > > > > > >
> > > > > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > > > > 
> > > > > By "grow on the stack", use the compiler-allocated rcu_head on the
> > > > > kfree_rcu() caller's stack.
> > > > > 
> > > > > I meant here to say, if we are not in atomic context, then we use regular
> > > > > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > > > > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > > > > the allocation failure would mean the need for RCU to free some memory is
> > > > > probably great.
> > > > > 
> > > > Ah, i got it. I thought you meant something like recursion and then
> > > > unwinding the stack back somehow :)
> > > > 
> > > > > > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > > > > > between the 2 cases.
> > > > > > > > 
> > > > > > If the current context is preemptable then we can inline synchronize_rcu()
> > > > > > together with freeing to handle such corner case, i mean when we are run
> > > > > > out of memory.
> > > > > 
> > > > > Ah yes, exactly what I mean.
> > > > > 
> > > > OK.
> > > > 
> > > > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > > > have a look at preempt_count of current process? If we have for example
> > > > > > nested rcu_read_locks:
> > > > > > 
> > > > > > <snip>
> > > > > > rcu_read_lock()
> > > > > >     rcu_read_lock()
> > > > > >         rcu_read_lock()
> > > > > > <snip>
> > > > > > 
> > > > > > the counter would be 3.
> > > > > 
> > > > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > > > section (unless the kernel is RT).
> > > > > 
> > > > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > > > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > > > then we skip it and consider as atomic. Something like:
> > > > 
> > > > <snip>
> > > > static bool is_current_in_atomic()
> > > > {
> > > > #ifdef CONFIG_PREEMPT_RCU
> > > 
> > > If possible: if (IS_ENABLED(CONFIG_PREEMPT_RCU))
> > > 
> > > Much nicer than #ifdef, and I -think- it should work in this case.
> > > 
> > OK. Thank you, Paul!
> > 
> > There is one point i would like to highlight it is about making caller
> > instead to be responsible for atomic or not decision. Like how kmalloc()
> > works, it does not really know the context it runs on, so it is up to
> > caller to inform.
> > 
> > The same way:
> > 
> > kvfree_rcu(p, atomic = true/false);
> > 
> > in this case we could cover !CONFIG_PREEMPT case also.
> 
> Understood, but couldn't we instead use IS_ENABLED() to work out the
> actual situation at runtime and relieve the caller of this burden?
> Or am I missing a corner case?
> 
Yes we can do it in run-time, i mean to detect context type, atomic or not.
But only for CONFIG_PREEMPT kernel. In case of !CONFIG_PREEMPT configuration 
i do not see a straight forward way how to detect it. For example when caller 
holds "spinlock". Therefore for such configuration we can just consider it
as atomic. But in reality it could be not in atomic.

We need it for emergency/corner case and head-less objects. When we are run
of memory. So in this case we should attach the rcu_head dynamically and
queue the freed object to be processed later on, after GP.

If atomic context use GFP_ATOMIC flag if not use GFP_KERNEL. It is better 
to allocate with GFP_KERNEL flag(if possible) because it has much less
restrictions then GFP_ATOMIC one, i.e. GFP_KERNEL can sleep and wait until
the memory is reclaimed.

But that is a corner case and i agree that it would be good to avoid of
such passing of extra info by the caller.

Anyway i just share some extra info :)

Thanks.

--
Vlad Rezki
Joel Fernandes Feb. 27, 2020, 1:37 p.m. UTC | #34
Sorry for slightly late reply.

On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > 
> > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > queue that.
> > > > > 
> > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > gets failed in atomic context? Or we can just consider it as out of
> > > memory and another variant is to say that headless object can be called
> > > from preemptible context only.
> > 
> > Yes that makes sense, and we can always put disclaimer in the API's comments
> > saying if this object is expected to be freed a lot, then don't use the
> > headless-API to be extra safe.
> > 
> Agree.
> 
> > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > there seems to be bigger problems in the system any way. I would say let us
> > write a patch to allocate there and see what the -mm guys think.
> > 
> OK. It might be that they can offer something if they do not like our
> approach. I will try to compose something and send the patch to see.
> The tree.c implementation is almost done, whereas tiny one is on hold.
> 
> I think we should support batching as well as bulk interface there.
> Another way is to workaround head-less object, just to attach the head
> dynamically using kmalloc() and then call_rcu() but then it will not be
> a fair headless support :)
> 
> What is your view?

This kind of "head" will require backpointers to the original object as well
right? And still wouldn't solve the "what if we run out of GFP_ATOMIC
reserves". But let me know in a code snippet if possible about what you mean.

> > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > synchronize_rcu() inline with it.
> > > > > 
> > > > >
> > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > 
> > By "grow on the stack", use the compiler-allocated rcu_head on the
> > kfree_rcu() caller's stack.
> > 
> > I meant here to say, if we are not in atomic context, then we use regular
> > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > the allocation failure would mean the need for RCU to free some memory is
> > probably great.
> > 
> Ah, i got it. I thought you meant something like recursion and then
> unwinding the stack back somehow :)

Yeah something like that :) Use the compiler allocated space which you
wouldn't run out of unless stack overflows.

> > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > have a look at preempt_count of current process? If we have for example
> > > nested rcu_read_locks:
> > > 
> > > <snip>
> > > rcu_read_lock()
> > >     rcu_read_lock()
> > >         rcu_read_lock()
> > > <snip>
> > > 
> > > the counter would be 3.
> > 
> > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > reader sections can be preempted, they just cannot goto sleep in a reader
> > section (unless the kernel is RT).
> > 
> So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> then we skip it and consider as atomic. Something like:
> 
> <snip>
> static bool is_current_in_atomic()

Would be good to change this to is_current_in_rcu_reader() since
rcu_preempt_depth() does not imply atomicity.

> {
> #ifdef CONFIG_PREEMPT_RCU
>     if (!rcu_preempt_depth() && !in_atomic())
>         return false;

I think use if (!rcu_preempt_depth() && preemptible()) here.

preemptible() checks for IRQ disabled section as well.

> #endif
> 
>     return true;

Otherwise LGTM.

thanks!

 - Joel

> }
> <snip>
Joel Fernandes Feb. 27, 2020, 2:08 p.m. UTC | #35
On Wed, Feb 26, 2020 at 04:53:47PM +0100, Uladzislau Rezki wrote:
> On Wed, Feb 26, 2020 at 07:06:56AM -0800, Paul E. McKenney wrote:
> > On Wed, Feb 26, 2020 at 02:04:40PM +0100, Uladzislau Rezki wrote:
> > > On Tue, Feb 25, 2020 at 02:47:45PM -0800, Paul E. McKenney wrote:
> > > > On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > > > > > 
> > > > > > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > > > > > queue that.
> > > > > > > > > 
> > > > > > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > > > > > gets failed in atomic context? Or we can just consider it as out of
> > > > > > > memory and another variant is to say that headless object can be called
> > > > > > > from preemptible context only.
> > > > > > 
> > > > > > Yes that makes sense, and we can always put disclaimer in the API's comments
> > > > > > saying if this object is expected to be freed a lot, then don't use the
> > > > > > headless-API to be extra safe.
> > > > > > 
> > > > > Agree.
> > > > > 
> > > > > > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > > > > > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > > > > > there seems to be bigger problems in the system any way. I would say let us
> > > > > > write a patch to allocate there and see what the -mm guys think.
> > > > > > 
> > > > > OK. It might be that they can offer something if they do not like our
> > > > > approach. I will try to compose something and send the patch to see.
> > > > > The tree.c implementation is almost done, whereas tiny one is on hold.
> > > > > 
> > > > > I think we should support batching as well as bulk interface there.
> > > > > Another way is to workaround head-less object, just to attach the head
> > > > > dynamically using kmalloc() and then call_rcu() but then it will not be
> > > > > a fair headless support :)
> > > > > 
> > > > > What is your view?
> > > > > 
> > > > > > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > > > > > synchronize_rcu() inline with it.
> > > > > > > > > 
> > > > > > > > >
> > > > > > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > > > > > 
> > > > > > By "grow on the stack", use the compiler-allocated rcu_head on the
> > > > > > kfree_rcu() caller's stack.
> > > > > > 
> > > > > > I meant here to say, if we are not in atomic context, then we use regular
> > > > > > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > > > > > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > > > > > the allocation failure would mean the need for RCU to free some memory is
> > > > > > probably great.
> > > > > > 
> > > > > Ah, i got it. I thought you meant something like recursion and then
> > > > > unwinding the stack back somehow :)
> > > > > 
> > > > > > > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > > > > > > between the 2 cases.
> > > > > > > > > 
> > > > > > > If the current context is preemptable then we can inline synchronize_rcu()
> > > > > > > together with freeing to handle such corner case, i mean when we are run
> > > > > > > out of memory.
> > > > > > 
> > > > > > Ah yes, exactly what I mean.
> > > > > > 
> > > > > OK.
> > > > > 
> > > > > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > > > > have a look at preempt_count of current process? If we have for example
> > > > > > > nested rcu_read_locks:
> > > > > > > 
> > > > > > > <snip>
> > > > > > > rcu_read_lock()
> > > > > > >     rcu_read_lock()
> > > > > > >         rcu_read_lock()
> > > > > > > <snip>
> > > > > > > 
> > > > > > > the counter would be 3.
> > > > > > 
> > > > > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > > > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > > > > section (unless the kernel is RT).
> > > > > > 
> > > > > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > > > > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > > > > then we skip it and consider as atomic. Something like:
> > > > > 
> > > > > <snip>
> > > > > static bool is_current_in_atomic()
> > > > > {
> > > > > #ifdef CONFIG_PREEMPT_RCU
> > > > 
> > > > If possible: if (IS_ENABLED(CONFIG_PREEMPT_RCU))
> > > > 
> > > > Much nicer than #ifdef, and I -think- it should work in this case.
> > > > 
> > > OK. Thank you, Paul!
> > > 
> > > There is one point i would like to highlight it is about making caller
> > > instead to be responsible for atomic or not decision. Like how kmalloc()
> > > works, it does not really know the context it runs on, so it is up to
> > > caller to inform.
> > > 
> > > The same way:
> > > 
> > > kvfree_rcu(p, atomic = true/false);
> > > 
> > > in this case we could cover !CONFIG_PREEMPT case also.
> > 
> > Understood, but couldn't we instead use IS_ENABLED() to work out the
> > actual situation at runtime and relieve the caller of this burden?
> > Or am I missing a corner case?
> > 
> Yes we can do it in run-time, i mean to detect context type, atomic or not.
> But only for CONFIG_PREEMPT kernel. In case of !CONFIG_PREEMPT configuration 
> i do not see a straight forward way how to detect it. For example when caller 
> holds "spinlock". Therefore for such configuration we can just consider it
> as atomic. But in reality it could be not in atomic.
> 
> We need it for emergency/corner case and head-less objects. When we are run
> of memory. So in this case we should attach the rcu_head dynamically and
> queue the freed object to be processed later on, after GP.
> 
> If atomic context use GFP_ATOMIC flag if not use GFP_KERNEL. It is better 
> to allocate with GFP_KERNEL flag(if possible) because it has much less
> restrictions then GFP_ATOMIC one, i.e. GFP_KERNEL can sleep and wait until
> the memory is reclaimed.
> 
> But that is a corner case and i agree that it would be good to avoid of
> such passing of extra info by the caller.
> 
> Anyway i just share some extra info :)

Hmm, I can't see at the moment how you can use GFP_KERNEL here for
!CONFIG_PREEMPT kernels since that sleeps and you can't detect easily if you
are in an RCU reader on !CONFIG_PREEMPT unless lockdep is turned on (in which
case you could have checked lockdep's map).

How about for !PREEMPT using first: GFP_NOWAIT and second GFP_ATOMIC if
(NOWAIT fails)?  And for PREEMPT, use GFP_KERNEL, then GFP_ATOMIC (if
GFP_KERNEL fails).  Thoughts?

thanks,

 - Joel
Uladzislau Rezki March 1, 2020, 11:08 a.m. UTC | #36
>
> Sorry for slightly late reply.
> 
The same, i am on the vacation since last Thursday and the whole
next week. Therefore will be delays due to restricted access
to my emails :)

> On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > > 
> > > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > > queue that.
> > > > > > 
> > > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > > gets failed in atomic context? Or we can just consider it as out of
> > > > memory and another variant is to say that headless object can be called
> > > > from preemptible context only.
> > > 
> > > Yes that makes sense, and we can always put disclaimer in the API's comments
> > > saying if this object is expected to be freed a lot, then don't use the
> > > headless-API to be extra safe.
> > > 
> > Agree.
> > 
> > > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > > there seems to be bigger problems in the system any way. I would say let us
> > > write a patch to allocate there and see what the -mm guys think.
> > > 
> > OK. It might be that they can offer something if they do not like our
> > approach. I will try to compose something and send the patch to see.
> > The tree.c implementation is almost done, whereas tiny one is on hold.
> > 
> > I think we should support batching as well as bulk interface there.
> > Another way is to workaround head-less object, just to attach the head
> > dynamically using kmalloc() and then call_rcu() but then it will not be
> > a fair headless support :)
> > 
> > What is your view?
> 
> This kind of "head" will require backpointers to the original object as well
> right? And still wouldn't solve the "what if we run out of GFP_ATOMIC
> reserves". But let me know in a code snippet if possible about what you mean.
> 
Just to summarize. We would like to support head-less kvfree_rcu() interface.
It implies that we have only pure pointer that is passed and that is it. Therefore
we should maintain the dynamic arrays and place it there. Like we do for "bulk"
logic, building arrays chains. Or just attach the head for tiny version. I prefer
first variant because that is fair and will be aligned with tree RCU version.

If we can not maintain the array path, i mean under low memory condition, it makes
sense to try to attach a head(for array we allocate one page), i.e. to make an object
with rcu_head the same as ww would free regular object that contains rcu_head filed
in it: 

<snip>
static inline struct rcu_head *
attach_rcu_head_to_object(void *obj, gfp_t gfp)
{
    unsigned long *ptr;

    ptr = kmalloc(sizeof(unsigned long *) +
        sizeof(struct rcu_head), gfp);
    if (!ptr)
        return NULL;

    ptr[0] = (unsigned long) obj;
    return ((struct rcu_head *) ++ptr);
}
...
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
...
    if (head == NULL && is_vmalloc_addr((void *) func)) {
        if (!vfree_call_rcu_add_ptr_to_bulk(krcp, (void *) func)) {
            head = attach_rcu_head_to_object((void *) func, GFP_ATOMIC);
            if (head) {
                /* Set the offset and tag the headless object. */
                func = (rcu_callback_t) (sizeof(unsigned long *) + 1);

                head->func = func;
                head->next = krcp->head;
                krcp->head = head;
   }

later on when freeing the headless object to witch we attached the head:

for (; head; head = next) {
...
  /* We tag the headless object, so check it. */
  if (!(((unsigned long) head - offset) & BIT(0))) {
   debug_rcu_head_unqueue(head);
  } else {
   offset -= 1;
   headless_ptr = (void *) head - offset;
  }
...
if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset))) {
   /*
    * here we kvfree() head-less object. The head was attached
    * due to low memory condition.
    */
   if (headless_ptr)
    kvfree((void *) *headless_ptr);

   kfree((void *)head - offset);
  }
<snip>

>
> And still wouldn't solve the "what if we run out of GFP_ATOMIC reserves".
>
It will not solve corner case. But it makes sense anyway to do because the
page allocator can say: no page, sorry, whereas slab can still serve our
request because we need only sizeof(rcu_head) + sizeof(unsigned long *)
bytes and not whole page.

Also when we detect low memory condition we should add force flag to schedule
the "monitor work" right away:

<snip>
 if (force)
     schedule_delayed_work(&krcp->monitor_work, 0);
<snip>

> > > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > > synchronize_rcu() inline with it.
> > > > > > 
> > > > > >
> > > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > > 
> > > By "grow on the stack", use the compiler-allocated rcu_head on the
> > > kfree_rcu() caller's stack.
> > > 
> > > I meant here to say, if we are not in atomic context, then we use regular
> > > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > > the allocation failure would mean the need for RCU to free some memory is
> > > probably great.
> > > 
> > Ah, i got it. I thought you meant something like recursion and then
> > unwinding the stack back somehow :)
> 
> Yeah something like that :) Use the compiler allocated space which you
> wouldn't run out of unless stack overflows.
> 
Hmm... Please show it here, because i am a bit confused how to do that :)

> > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > have a look at preempt_count of current process? If we have for example
> > > > nested rcu_read_locks:
> > > > 
> > > > <snip>
> > > > rcu_read_lock()
> > > >     rcu_read_lock()
> > > >         rcu_read_lock()
> > > > <snip>
> > > > 
> > > > the counter would be 3.
> > > 
> > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > section (unless the kernel is RT).
> > > 
> > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > then we skip it and consider as atomic. Something like:
> > 
> > <snip>
> > static bool is_current_in_atomic()
> 
> Would be good to change this to is_current_in_rcu_reader() since
> rcu_preempt_depth() does not imply atomicity.
>
can_current_synchronize_rcu()? If can we just call:

<snip>
    synchronize_rcu() or synchronize_rcu_expedited();
    kvfree();
<snip>

> > {
> > #ifdef CONFIG_PREEMPT_RCU
> >     if (!rcu_preempt_depth() && !in_atomic())
> >         return false;
> 
> I think use if (!rcu_preempt_depth() && preemptible()) here.
> 
> preemptible() checks for IRQ disabled section as well.
> 
Yes but in_atomic() does it as well, it also checks other atomic
contexts like softirq handlers and NMI ones. So calling there
synchronize_rcu() is not allowed.

Thank you, Joel :)

--
Vlad Rezki
Uladzislau Rezki March 1, 2020, 11:13 a.m. UTC | #37
On Thu, Feb 27, 2020 at 09:08:51AM -0500, Joel Fernandes wrote:
> On Wed, Feb 26, 2020 at 04:53:47PM +0100, Uladzislau Rezki wrote:
> > On Wed, Feb 26, 2020 at 07:06:56AM -0800, Paul E. McKenney wrote:
> > > On Wed, Feb 26, 2020 at 02:04:40PM +0100, Uladzislau Rezki wrote:
> > > > On Tue, Feb 25, 2020 at 02:47:45PM -0800, Paul E. McKenney wrote:
> > > > > On Tue, Feb 25, 2020 at 07:54:00PM +0100, Uladzislau Rezki wrote:
> > > > > > > > > > I was thinking a 2 fold approach (just thinking out loud..):
> > > > > > > > > > 
> > > > > > > > > > If kfree_call_rcu() is called in atomic context or in any rcu reader, then
> > > > > > > > > > use GFP_ATOMIC to grow an rcu_head wrapper on the atomic memory pool and
> > > > > > > > > > queue that.
> > > > > > > > > > 
> > > > > > > > I am not sure if that is acceptable, i mean what to do when GFP_ATOMIC
> > > > > > > > gets failed in atomic context? Or we can just consider it as out of
> > > > > > > > memory and another variant is to say that headless object can be called
> > > > > > > > from preemptible context only.
> > > > > > > 
> > > > > > > Yes that makes sense, and we can always put disclaimer in the API's comments
> > > > > > > saying if this object is expected to be freed a lot, then don't use the
> > > > > > > headless-API to be extra safe.
> > > > > > > 
> > > > > > Agree.
> > > > > > 
> > > > > > > BTW, GFP_ATOMIC the documentation says if GFP_ATOMIC reserves are depleted,
> > > > > > > the kernel can even panic some times, so if GFP_ATOMIC allocation fails, then
> > > > > > > there seems to be bigger problems in the system any way. I would say let us
> > > > > > > write a patch to allocate there and see what the -mm guys think.
> > > > > > > 
> > > > > > OK. It might be that they can offer something if they do not like our
> > > > > > approach. I will try to compose something and send the patch to see.
> > > > > > The tree.c implementation is almost done, whereas tiny one is on hold.
> > > > > > 
> > > > > > I think we should support batching as well as bulk interface there.
> > > > > > Another way is to workaround head-less object, just to attach the head
> > > > > > dynamically using kmalloc() and then call_rcu() but then it will not be
> > > > > > a fair headless support :)
> > > > > > 
> > > > > > What is your view?
> > > > > > 
> > > > > > > > > > Otherwise, grow an rcu_head on the stack of kfree_call_rcu() and call
> > > > > > > > > > synchronize_rcu() inline with it.
> > > > > > > > > > 
> > > > > > > > > >
> > > > > > > > What do you mean here, Joel? "grow an rcu_head on the stack"?
> > > > > > > 
> > > > > > > By "grow on the stack", use the compiler-allocated rcu_head on the
> > > > > > > kfree_rcu() caller's stack.
> > > > > > > 
> > > > > > > I meant here to say, if we are not in atomic context, then we use regular
> > > > > > > GFP_KERNEL allocation, and if that fails, then we just use the stack's
> > > > > > > rcu_head and call synchronize_rcu() or even synchronize_rcu_expedited since
> > > > > > > the allocation failure would mean the need for RCU to free some memory is
> > > > > > > probably great.
> > > > > > > 
> > > > > > Ah, i got it. I thought you meant something like recursion and then
> > > > > > unwinding the stack back somehow :)
> > > > > > 
> > > > > > > > > > Use preemptible() andr task_struct's rcu_read_lock_nesting to differentiate
> > > > > > > > > > between the 2 cases.
> > > > > > > > > > 
> > > > > > > > If the current context is preemptable then we can inline synchronize_rcu()
> > > > > > > > together with freeing to handle such corner case, i mean when we are run
> > > > > > > > out of memory.
> > > > > > > 
> > > > > > > Ah yes, exactly what I mean.
> > > > > > > 
> > > > > > OK.
> > > > > > 
> > > > > > > > As for "task_struct's rcu_read_lock_nesting". Will it be enough just
> > > > > > > > have a look at preempt_count of current process? If we have for example
> > > > > > > > nested rcu_read_locks:
> > > > > > > > 
> > > > > > > > <snip>
> > > > > > > > rcu_read_lock()
> > > > > > > >     rcu_read_lock()
> > > > > > > >         rcu_read_lock()
> > > > > > > > <snip>
> > > > > > > > 
> > > > > > > > the counter would be 3.
> > > > > > > 
> > > > > > > No, because preempt_count is not incremented during rcu_read_lock(). RCU
> > > > > > > reader sections can be preempted, they just cannot goto sleep in a reader
> > > > > > > section (unless the kernel is RT).
> > > > > > > 
> > > > > > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > > > > > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > > > > > then we skip it and consider as atomic. Something like:
> > > > > > 
> > > > > > <snip>
> > > > > > static bool is_current_in_atomic()
> > > > > > {
> > > > > > #ifdef CONFIG_PREEMPT_RCU
> > > > > 
> > > > > If possible: if (IS_ENABLED(CONFIG_PREEMPT_RCU))
> > > > > 
> > > > > Much nicer than #ifdef, and I -think- it should work in this case.
> > > > > 
> > > > OK. Thank you, Paul!
> > > > 
> > > > There is one point i would like to highlight it is about making caller
> > > > instead to be responsible for atomic or not decision. Like how kmalloc()
> > > > works, it does not really know the context it runs on, so it is up to
> > > > caller to inform.
> > > > 
> > > > The same way:
> > > > 
> > > > kvfree_rcu(p, atomic = true/false);
> > > > 
> > > > in this case we could cover !CONFIG_PREEMPT case also.
> > > 
> > > Understood, but couldn't we instead use IS_ENABLED() to work out the
> > > actual situation at runtime and relieve the caller of this burden?
> > > Or am I missing a corner case?
> > > 
> > Yes we can do it in run-time, i mean to detect context type, atomic or not.
> > But only for CONFIG_PREEMPT kernel. In case of !CONFIG_PREEMPT configuration 
> > i do not see a straight forward way how to detect it. For example when caller 
> > holds "spinlock". Therefore for such configuration we can just consider it
> > as atomic. But in reality it could be not in atomic.
> > 
> > We need it for emergency/corner case and head-less objects. When we are run
> > of memory. So in this case we should attach the rcu_head dynamically and
> > queue the freed object to be processed later on, after GP.
> > 
> > If atomic context use GFP_ATOMIC flag if not use GFP_KERNEL. It is better 
> > to allocate with GFP_KERNEL flag(if possible) because it has much less
> > restrictions then GFP_ATOMIC one, i.e. GFP_KERNEL can sleep and wait until
> > the memory is reclaimed.
> > 
> > But that is a corner case and i agree that it would be good to avoid of
> > such passing of extra info by the caller.
> > 
> > Anyway i just share some extra info :)
> 
> Hmm, I can't see at the moment how you can use GFP_KERNEL here for
> !CONFIG_PREEMPT kernels since that sleeps and you can't detect easily if you
> are in an RCU reader on !CONFIG_PREEMPT unless lockdep is turned on (in which
> case you could have checked lockdep's map).
> 
Right. Therefore i proposed to pass bolean variable indicating atomic or not.
So a caller is responsible to say where it is. It would be much more easier  
+ we would cover CONFIG_PREEMPT=n case. Otherwise we have to consider it as
atomic or in an RCU reader section, i.e. can not use synchronize_rcu() or 
GFP_KERNEL flags.

>
> How about for !PREEMPT using first: GFP_NOWAIT and second GFP_ATOMIC if
> (NOWAIT fails)?  And for PREEMPT, use GFP_KERNEL, then GFP_ATOMIC (if
> GFP_KERNEL fails).  Thoughts?
> 
Yes, it makes sense to me :) 

--
Vlad Rezki
Uladzislau Rezki March 1, 2020, 12:07 p.m. UTC | #38
> > > So in CONFIG_PREEMPT kernel we can identify if we are in atomic or not by
> > > using rcu_preempt_depth() and in_atomic(). When it comes to !CONFIG_PREEMPT
> > > then we skip it and consider as atomic. Something like:
> > > 
> > > <snip>
> > > static bool is_current_in_atomic()
> > 
> > Would be good to change this to is_current_in_rcu_reader() since
> > rcu_preempt_depth() does not imply atomicity.
> >
> can_current_synchronize_rcu()? If can we just call:
> 
> <snip>
>     synchronize_rcu() or synchronize_rcu_expedited();
>     kvfree();
> <snip>
> 
> > > {
> > > #ifdef CONFIG_PREEMPT_RCU
> > >     if (!rcu_preempt_depth() && !in_atomic())
> > >         return false;
> > 
> > I think use if (!rcu_preempt_depth() && preemptible()) here.
> > 
> > preemptible() checks for IRQ disabled section as well.
> > 
> Yes but in_atomic() does it as well, it also checks other atomic
> contexts like softirq handlers and NMI ones. So calling there
> synchronize_rcu() is not allowed.
> 
Ahh. Right you are. We also have to check if irqs are disabled
or not. preemptible() has to be added as well.

<snip>
can_current_synchronize_rcu()
{
    if (IS_ENABLED(CONFIG_PREEMPT_RCU)) {
        if (!rcu_preempt_depth() && !in_atomic() && preemptible()) {
            might_sleep();
            return true;
	}
    }

    return false;
}
<snip>

if we can synchronize:
    - we can directly inline kvfree() to current context;
    - we can attached the head using GFP_KERNEL | __GFP_RETRY_MAYFAIL.
    
Otherwise attached the rcu_head under atomic or as we are in RCU reader section.

Thoughts?

--
Vlad Rezki

Patch
diff mbox series

diff --git a/fs/ext4/resize.c b/fs/ext4/resize.c
index 86a2500ed292..98d3b4ec3422 100644
--- a/fs/ext4/resize.c
+++ b/fs/ext4/resize.c
@@ -17,6 +17,33 @@ 
 
 #include "ext4_jbd2.h"
 
+struct ext4_rcu_ptr {
+	struct rcu_head rcu;
+	void *ptr;
+};
+
+static void ext4_rcu_ptr_callback(struct rcu_head *head)
+{
+	struct ext4_rcu_ptr *ptr;
+
+	ptr = container_of(head, struct ext4_rcu_ptr, rcu);
+	kvfree(ptr->ptr);
+	kfree(ptr);
+}
+
+void ext4_kvfree_array_rcu(void *to_free)
+{
+	struct ext4_rcu_ptr *ptr = kzalloc(sizeof(*ptr), GFP_KERNEL);
+
+	if (ptr) {
+		ptr->ptr = to_free;
+		call_rcu(&ptr->rcu, ext4_rcu_ptr_callback);
+		return;
+	}
+	synchronize_rcu();
+	kvfree(ptr);
+}
+
 int ext4_resize_begin(struct super_block *sb)
 {
 	struct ext4_sb_info *sbi = EXT4_SB(sb);
@@ -864,9 +891,9 @@  static int add_new_gdb(handle_t *handle, struct inode *inode,
 	memcpy(n_group_desc, o_group_desc,
 	       EXT4_SB(sb)->s_gdb_count * sizeof(struct buffer_head *));
 	n_group_desc[gdb_num] = gdb_bh;
-	EXT4_SB(sb)->s_group_desc = n_group_desc;
+	rcu_assign_pointer(EXT4_SB(sb)->s_group_desc, n_group_desc);
 	EXT4_SB(sb)->s_gdb_count++;
-	kvfree(o_group_desc);
+	ext4_kvfree_array_rcu(o_group_desc);
 
 	le16_add_cpu(&es->s_reserved_gdt_blocks, -1);
 	err = ext4_handle_dirty_super(handle, sb);
@@ -922,9 +949,9 @@  static int add_new_gdb_meta_bg(struct super_block *sb,
 		return err;
 	}
 
-	EXT4_SB(sb)->s_group_desc = n_group_desc;
+	rcu_assign_pointer(EXT4_SB(sb)->s_group_desc, n_group_desc);
 	EXT4_SB(sb)->s_gdb_count++;
-	kvfree(o_group_desc);
+	ext4_kvfree_array_rcu(o_group_desc);
 	return err;
 }
 
diff --git a/fs/ext4/balloc.c b/fs/ext4/balloc.c
index 5f993a411251..5368bf67300b 100644
--- a/fs/ext4/balloc.c
+++ b/fs/ext4/balloc.c
@@ -270,6 +270,7 @@  struct ext4_group_desc * ext4_get_group_desc(struct super_block *sb,
 	ext4_group_t ngroups = ext4_get_groups_count(sb);
 	struct ext4_group_desc *desc;
 	struct ext4_sb_info *sbi = EXT4_SB(sb);
+	struct buffer_head *bh_p;
 
 	if (block_group >= ngroups) {
 		ext4_error(sb, "block_group >= groups_count - block_group = %u,"
@@ -280,7 +281,15 @@  struct ext4_group_desc * ext4_get_group_desc(struct super_block *sb,
 
 	group_desc = block_group >> EXT4_DESC_PER_BLOCK_BITS(sb);
 	offset = block_group & (EXT4_DESC_PER_BLOCK(sb) - 1);
-	if (!sbi->s_group_desc[group_desc]) {
+	rcu_read_lock();
+	bh_p = rcu_dereference(sbi->s_group_desc)[group_desc];
+	/*
+	 * We can unlock here since the pointer being dereferenced won't be
+	 * dereferenced again. By looking at the usage in add_new_gdb() the
+	 * value isn't modified, just the pointer, and so it remains valid.
+	 */
+	rcu_read_unlock();
+	if (!bh_p) {
 		ext4_error(sb, "Group descriptor not loaded - "
 			   "block_group = %u, group_desc = %u, desc = %u",
 			   block_group, group_desc, offset);
@@ -288,10 +297,10 @@  struct ext4_group_desc * ext4_get_group_desc(struct super_block *sb,
 	}
 
 	desc = (struct ext4_group_desc *)(
-		(__u8 *)sbi->s_group_desc[group_desc]->b_data +
+		(__u8 *)bh_p->b_data +
 		offset * EXT4_DESC_SIZE(sb));
 	if (bh)
-		*bh = sbi->s_group_desc[group_desc];
+		*bh = bh_p;
 	return desc;
 }
 
diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
index 4441331d06cc..b7824d56b968 100644
--- a/fs/ext4/ext4.h
+++ b/fs/ext4/ext4.h
@@ -2730,8 +2730,8 @@  extern int ext4_generic_delete_entry(handle_t *handle,
 extern bool ext4_empty_dir(struct inode *inode);
 
 /* resize.c */
+extern void ext4_kvfree_array_rcu(void *to_free);
 extern int ext4_group_add(struct super_block *sb,
 				struct ext4_new_group_data *input);
 extern int ext4_group_extend(struct super_block *sb,

--