diff mbox

[net-next] rhashtable: involve rhashtable_lookup_insert routine

Message ID 1420447578-19320-1-git-send-email-ying.xue@windriver.com
State Superseded, archived
Delegated to: David Miller
Headers show

Commit Message

Ying Xue Jan. 5, 2015, 8:46 a.m. UTC
Involve a new function called rhashtable_lookup_insert() which makes
lookup and insertion atomic under bucket lock protection, helping us
avoid to introduce an extra lock when we search and insert an object
into hash table.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Cc: Thomas Graf <tgraf@suug.ch>
---
 include/linux/rhashtable.h |    1 +
 lib/rhashtable.c           |   62 +++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 62 insertions(+), 1 deletion(-)

Comments

Ying Xue Jan. 5, 2015, 9:07 a.m. UTC | #1
On 01/05/2015 04:46 PM, Ying Xue wrote:
> Involve a new function called rhashtable_lookup_insert() which makes
> lookup and insertion atomic under bucket lock protection, helping us
> avoid to introduce an extra lock when we search and insert an object
> into hash table.
> 

Sorry, please ignore the version. We should compare key instead of
object as we want to check whether in a bucket chain there is an entry
whose key is identical with that of object to be inserted.

Next version will come soon.

Regards,
Ying

> Signed-off-by: Ying Xue <ying.xue@windriver.com>
> Cc: Thomas Graf <tgraf@suug.ch>
> ---
>  include/linux/rhashtable.h |    1 +
>  lib/rhashtable.c           |   62 +++++++++++++++++++++++++++++++++++++++++++-
>  2 files changed, 62 insertions(+), 1 deletion(-)
> 
> diff --git a/include/linux/rhashtable.h b/include/linux/rhashtable.h
> index de1459c7..73c913f 100644
> --- a/include/linux/rhashtable.h
> +++ b/include/linux/rhashtable.h
> @@ -168,6 +168,7 @@ int rhashtable_shrink(struct rhashtable *ht);
>  void *rhashtable_lookup(struct rhashtable *ht, const void *key);
>  void *rhashtable_lookup_compare(struct rhashtable *ht, const void *key,
>  				bool (*compare)(void *, void *), void *arg);
> +bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj);
>  
>  void rhashtable_destroy(struct rhashtable *ht);
>  
> diff --git a/lib/rhashtable.c b/lib/rhashtable.c
> index cbad192..04e0af7 100644
> --- a/lib/rhashtable.c
> +++ b/lib/rhashtable.c
> @@ -493,7 +493,7 @@ static void rht_deferred_worker(struct work_struct *work)
>  }
>  
>  /**
> - * rhashtable_insert - insert object into hash hash table
> + * rhashtable_insert - insert object into hash table
>   * @ht:		hash table
>   * @obj:	pointer to hash head inside object
>   *
> @@ -700,6 +700,66 @@ restart:
>  }
>  EXPORT_SYMBOL_GPL(rhashtable_lookup_compare);
>  
> +/**
> + * rhashtable_lookup_insert - lookup and insert object into hash table
> + * @ht:		hash table
> + * @obj:	pointer to hash head inside object
> + *
> + * Computes the hash value for the given object and traverses the bucket
> + * chain checking whether the object exists in the chain or not. If it's
> + * not found, the object is then inserted into the bucket chain. As the
> + * bucket lock is held during the process of lookup and insertion, this
> + * helps to avoid extra lock involvement.
> + *
> + * It is safe to call this function from atomic context.
> + *
> + * Will trigger an automatic deferred table resizing if the size grows
> + * beyond the watermark indicated by grow_decision() which can be passed
> + * to rhashtable_init().
> + */
> +bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj)
> +{
> +	struct bucket_table *tbl;
> +	struct rhash_head *he, *head;
> +	spinlock_t *lock;
> +	unsigned int hash;
> +
> +	rcu_read_lock();
> +
> +	tbl = rht_dereference_rcu(ht->tbl, ht);
> +	hash = head_hashfn(ht, tbl, obj);
> +	lock = bucket_lock(tbl, hash);
> +
> +	spin_lock_bh(lock);
> +	head = rht_dereference_bucket(tbl->buckets[hash], tbl, hash);
> +	if (rht_is_a_nulls(head)) {
> +		INIT_RHT_NULLS_HEAD(obj->next, ht, hash);
> +	} else {
> +		rht_for_each(he, tbl, hash) {
> +			if (he == obj) {
> +				spin_unlock_bh(lock);
> +				rcu_read_unlock();
> +				return false;
> +			}
> +		}
> +		RCU_INIT_POINTER(obj->next, head);
> +	}
> +	rcu_assign_pointer(tbl->buckets[hash], obj);
> +	spin_unlock_bh(lock);
> +
> +	atomic_inc(&ht->nelems);
> +
> +	/* Only grow the table if no resizing is currently in progress. */
> +	if (ht->tbl != ht->future_tbl &&
> +	    ht->p.grow_decision && ht->p.grow_decision(ht, tbl->size))
> +		schedule_delayed_work(&ht->run_work, 0);
> +
> +	rcu_read_unlock();
> +
> +	return true;
> +}
> +EXPORT_SYMBOL_GPL(rhashtable_lookup_insert);
> +
>  static size_t rounded_hashtable_size(struct rhashtable_params *params)
>  {
>  	return max(roundup_pow_of_two(params->nelem_hint * 4 / 3),
> 

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Thomas Graf Jan. 5, 2015, 9:46 a.m. UTC | #2
On 01/05/15 at 05:07pm, Ying Xue wrote:
> On 01/05/2015 04:46 PM, Ying Xue wrote:
> > Involve a new function called rhashtable_lookup_insert() which makes
> > lookup and insertion atomic under bucket lock protection, helping us
> > avoid to introduce an extra lock when we search and insert an object
> > into hash table.
> > 
> 
> Sorry, please ignore the version. We should compare key instead of
> object as we want to check whether in a bucket chain there is an entry
> whose key is identical with that of object to be inserted.

Agreed. Also, this needs to handle the special case while resizing is
taking place and tbl and future_tbl point to individual tables. In that
case a parallel writer might have or is about to add 'obj' to future_tbl.

We need something like this:

        rcu_read_lock();
        old_tbl = rht_dereference_rcu(ht->tbl, ht);
        old_hash = head_hashfn(ht, old_tbl, obj);
        old_bucket_lock = bucket_lock(old_tbl, old_hash);
        spin_lock_bh(old_bucket_lock);

        new_tbl = rht_dereference_rcu(ht->future_tbl, ht);
        if (unlikely(old_tbl != new_tbl)) {
                new_hash = head_hashfn(ht, new_tbl, obj);
                new_bucket_lock = bucket_lock(new_tbl, new_hash);
                spin_lock_bh_nested(new_bucket_lock, RHT_LOCK_NESTED1);

                /* Resizing is in progress, search for a matching entry in the
                 * old table before attempting to insert to the future table.
                 */
                rht_for_each_rcu(he, tbl, rht_bucket_index(old_tbl, old_hash)) {
                        if (!memcmp(rht_obj(ht, he) + ht->p.key_offset,
                                    rht_obj(ht, obj) + ht->p.key_offset,
                                    ht->p.key_len))
                                goto entry_exists;
                }
        }

        head = rht_dereference_bucket(new_tbl->buckets[hash], new_tbl, hash);
        if (rht_is_a_nulls(head)) {
                INIT_RHT_NULLS_HEAD(obj->next, ht, new_hash);
        } else {
                rht_for_each(he, new_tbl, new_hash) {
                        if (!memcmp(rht_obj(ht, he) + ht->p.key_offset,
                                    rht_obj(ht, obj) + ht->p.key_offset,
                                    ht->p.key_len))
                                goto entry_exists;
                }
                RCU_INIT_POINTER(obj->next, head);

        }

        rcu_assign_pointer(new_tbl->buckets[hash], obj);
        spin_unlock_bh(new_bucket_lock);
        if (unlikely(old_tbl != new_tbl))
                spin_unlock_bh(old_bucket_lock);

        atomic_inc(&ht->nelems);

        /* Only grow the table if no resizing is currently in progress. */
        if (ht->tbl != ht->future_tbl &&
            ht->p.grow_decision && ht->p.grow_decision(ht, tbl->size))
                schedule_delayed_work(&ht->run_work, 0);

        rcu_read_unlock();

        return true;

entry_exists:
        spin_unlock_bh(new_bucket_lock);
        spin_unlock_bh(old_bucket_lock);
        rcu_read_unlock();

        return false;

What this does in addition is:
 * Locks down the bucket in both the old and new table if a resize is
   in progress to ensure that writers can't remove from the old table
   and can't insert to the new table during the atomic operation.
 * Search for duplicates in the old table if a resize is in progress.
 * Use memcmp() instead of ptr1 != ptr2 to search for duplicates
   assuming we want to avoid key duplicates with this function.

*Please* verify this code very carefully, I wrote it out of my head
and did not compile or test it in any way yet. I'll double check and
think of a better unit test for this so we can validate functionality
like this.

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Ying Xue Jan. 5, 2015, 11:44 a.m. UTC | #3
On 01/05/2015 05:46 PM, Thomas Graf wrote:
> On 01/05/15 at 05:07pm, Ying Xue wrote:
>> On 01/05/2015 04:46 PM, Ying Xue wrote:
>>> Involve a new function called rhashtable_lookup_insert() which makes
>>> lookup and insertion atomic under bucket lock protection, helping us
>>> avoid to introduce an extra lock when we search and insert an object
>>> into hash table.
>>>
>>
>> Sorry, please ignore the version. We should compare key instead of
>> object as we want to check whether in a bucket chain there is an entry
>> whose key is identical with that of object to be inserted.
> 
> Agreed. Also, this needs to handle the special case while resizing is
> taking place and tbl and future_tbl point to individual tables. In that
> case a parallel writer might have or is about to add 'obj' to future_tbl.
> 
> We need something like this:
> 
>         rcu_read_lock();
>         old_tbl = rht_dereference_rcu(ht->tbl, ht);
>         old_hash = head_hashfn(ht, old_tbl, obj);
>         old_bucket_lock = bucket_lock(old_tbl, old_hash);
>         spin_lock_bh(old_bucket_lock);
> 
>         new_tbl = rht_dereference_rcu(ht->future_tbl, ht);
>         if (unlikely(old_tbl != new_tbl)) {
>                 new_hash = head_hashfn(ht, new_tbl, obj);
>                 new_bucket_lock = bucket_lock(new_tbl, new_hash);
>                 spin_lock_bh_nested(new_bucket_lock, RHT_LOCK_NESTED1);
> 
>                 /* Resizing is in progress, search for a matching entry in the
>                  * old table before attempting to insert to the future table.
>                  */
>                 rht_for_each_rcu(he, tbl, rht_bucket_index(old_tbl, old_hash)) {
>                         if (!memcmp(rht_obj(ht, he) + ht->p.key_offset,
>                                     rht_obj(ht, obj) + ht->p.key_offset,
>                                     ht->p.key_len))
>                                 goto entry_exists;
>                 }
>         }
> 
>         head = rht_dereference_bucket(new_tbl->buckets[hash], new_tbl, hash);
>         if (rht_is_a_nulls(head)) {
>                 INIT_RHT_NULLS_HEAD(obj->next, ht, new_hash);
>         } else {
>                 rht_for_each(he, new_tbl, new_hash) {
>                         if (!memcmp(rht_obj(ht, he) + ht->p.key_offset,
>                                     rht_obj(ht, obj) + ht->p.key_offset,
>                                     ht->p.key_len))
>                                 goto entry_exists;
>                 }
>                 RCU_INIT_POINTER(obj->next, head);
> 
>         }
> 
>         rcu_assign_pointer(new_tbl->buckets[hash], obj);
>         spin_unlock_bh(new_bucket_lock);
>         if (unlikely(old_tbl != new_tbl))
>                 spin_unlock_bh(old_bucket_lock);
> 
>         atomic_inc(&ht->nelems);
> 
>         /* Only grow the table if no resizing is currently in progress. */
>         if (ht->tbl != ht->future_tbl &&
>             ht->p.grow_decision && ht->p.grow_decision(ht, tbl->size))
>                 schedule_delayed_work(&ht->run_work, 0);
> 
>         rcu_read_unlock();
> 
>         return true;
> 
> entry_exists:
>         spin_unlock_bh(new_bucket_lock);
>         spin_unlock_bh(old_bucket_lock);
>         rcu_read_unlock();
> 
>         return false;
> 
> What this does in addition is:
>  * Locks down the bucket in both the old and new table if a resize is
>    in progress to ensure that writers can't remove from the old table
>    and can't insert to the new table during the atomic operation.
>  * Search for duplicates in the old table if a resize is in progress.
>  * Use memcmp() instead of ptr1 != ptr2 to search for duplicates
>    assuming we want to avoid key duplicates with this function.
> 

Yes, I understood your above comments and changes, and absolutely agreed
with you. So I made a bit changes based on your above code in v2, and I
also did a simple test, as a result, it worked fine.

But, as you reminder below, we must carefully check the code and it's
better to write corresponding test case to verify the function. Yes,
we should do. However, as the time is too late for me now, I have to
deliver the new version first allowing you to earlier review it again if
possible. Tomorrow I will figure out how to design the test case.

Regards,
Ying

> *Please* verify this code very carefully, I wrote it out of my head
> and did not compile or test it in any way yet. I'll double check and
> think of a better unit test for this so we can validate functionality
> like this.
> 
> 
> 

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/linux/rhashtable.h b/include/linux/rhashtable.h
index de1459c7..73c913f 100644
--- a/include/linux/rhashtable.h
+++ b/include/linux/rhashtable.h
@@ -168,6 +168,7 @@  int rhashtable_shrink(struct rhashtable *ht);
 void *rhashtable_lookup(struct rhashtable *ht, const void *key);
 void *rhashtable_lookup_compare(struct rhashtable *ht, const void *key,
 				bool (*compare)(void *, void *), void *arg);
+bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj);
 
 void rhashtable_destroy(struct rhashtable *ht);
 
diff --git a/lib/rhashtable.c b/lib/rhashtable.c
index cbad192..04e0af7 100644
--- a/lib/rhashtable.c
+++ b/lib/rhashtable.c
@@ -493,7 +493,7 @@  static void rht_deferred_worker(struct work_struct *work)
 }
 
 /**
- * rhashtable_insert - insert object into hash hash table
+ * rhashtable_insert - insert object into hash table
  * @ht:		hash table
  * @obj:	pointer to hash head inside object
  *
@@ -700,6 +700,66 @@  restart:
 }
 EXPORT_SYMBOL_GPL(rhashtable_lookup_compare);
 
+/**
+ * rhashtable_lookup_insert - lookup and insert object into hash table
+ * @ht:		hash table
+ * @obj:	pointer to hash head inside object
+ *
+ * Computes the hash value for the given object and traverses the bucket
+ * chain checking whether the object exists in the chain or not. If it's
+ * not found, the object is then inserted into the bucket chain. As the
+ * bucket lock is held during the process of lookup and insertion, this
+ * helps to avoid extra lock involvement.
+ *
+ * It is safe to call this function from atomic context.
+ *
+ * Will trigger an automatic deferred table resizing if the size grows
+ * beyond the watermark indicated by grow_decision() which can be passed
+ * to rhashtable_init().
+ */
+bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj)
+{
+	struct bucket_table *tbl;
+	struct rhash_head *he, *head;
+	spinlock_t *lock;
+	unsigned int hash;
+
+	rcu_read_lock();
+
+	tbl = rht_dereference_rcu(ht->tbl, ht);
+	hash = head_hashfn(ht, tbl, obj);
+	lock = bucket_lock(tbl, hash);
+
+	spin_lock_bh(lock);
+	head = rht_dereference_bucket(tbl->buckets[hash], tbl, hash);
+	if (rht_is_a_nulls(head)) {
+		INIT_RHT_NULLS_HEAD(obj->next, ht, hash);
+	} else {
+		rht_for_each(he, tbl, hash) {
+			if (he == obj) {
+				spin_unlock_bh(lock);
+				rcu_read_unlock();
+				return false;
+			}
+		}
+		RCU_INIT_POINTER(obj->next, head);
+	}
+	rcu_assign_pointer(tbl->buckets[hash], obj);
+	spin_unlock_bh(lock);
+
+	atomic_inc(&ht->nelems);
+
+	/* Only grow the table if no resizing is currently in progress. */
+	if (ht->tbl != ht->future_tbl &&
+	    ht->p.grow_decision && ht->p.grow_decision(ht, tbl->size))
+		schedule_delayed_work(&ht->run_work, 0);
+
+	rcu_read_unlock();
+
+	return true;
+}
+EXPORT_SYMBOL_GPL(rhashtable_lookup_insert);
+
 static size_t rounded_hashtable_size(struct rhashtable_params *params)
 {
 	return max(roundup_pow_of_two(params->nelem_hint * 4 / 3),