Re: [RFC][PATCH 2/2] rcu classic: new algorithm for callbacks-processing(v2)

!MAILaRCHIVE_VOTE_RePLACE
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]
To: Ingo Molnar <mingo@...>
Cc: Lai Jiangshan <laijs@...>, Dipankar Sarma <dipankar@...>, Gautham Shenoy <ego@...>, Dhaval Giani <dhaval@...>, Peter Zijlstra <a.p.zijlstra@...>, Linux Kernel Mailing List <linux-kernel@...>
Date: Friday, August 1, 2008 - 5:11 pm

On Fri, Jul 25, 2008 at 09:54:54AM -0700, Paul E. McKenney wrote:

And here is a patch to apply on top of Jiangshan's patch that
rationalizes the locking and memory barriers.  Much of this is general
cleanup, not necessarily directly related to Jiangshan's patch.
Given this as a base, it should be fairly easy to make a hierarchical
implementation for machines that see lock contention in the RCU
infrastructure -- split and make a hierarchy on rcu_ctrlblk.

Thoughts?

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---

 rcuclassic.c |   62 +++++++++++++++++++++++++++++++++++++++++------------------
 1 file changed, 44 insertions(+), 18 deletions(-)

diff -urpNa -X dontdiff linux-2.6.26-ljsimp/kernel/rcuclassic.c linux-2.6.26-ljsimpfix/kernel/rcuclassic.c
--- linux-2.6.26-ljsimp/kernel/rcuclassic.c	2008-07-24 13:46:58.000000000 -0700
+++ linux-2.6.26-ljsimpfix/kernel/rcuclassic.c	2008-07-28 12:47:29.000000000 -0700
@@ -86,6 +86,7 @@ static void force_quiescent_state(struct
 	int cpu;
 	cpumask_t cpumask;
 	set_need_resched();
+	spin_lock(&rcp->lock);
 	if (unlikely(!rcp->signaled)) {
 		rcp->signaled = 1;
 		/*
@@ -111,6 +112,7 @@ static void force_quiescent_state(struct
 		for_each_cpu_mask(cpu, cpumask)
 			smp_send_reschedule(cpu);
 	}
+	spin_unlock(&rcp->lock);
 }
 #else
 static inline void force_quiescent_state(struct rcu_data *rdp,
@@ -124,7 +126,11 @@ static void __call_rcu(struct rcu_head *
 		struct rcu_data *rdp)
 {
 	long batch;
-	smp_mb(); /* reads the most recently updated value of rcu->cur. */
+	unsigned long flags;
+
+	head->next = NULL;
+	local_irq_save(flags);
+	smp_mb(); /* Read of rcu->cur must happen after any change by caller. */
 
 	/*
 	 * Determine the batch number of this callback.
@@ -155,6 +161,7 @@ static void __call_rcu(struct rcu_head *
 		rdp->blimit = INT_MAX;
 		force_quiescent_state(rdp, &rcu_ctrlblk);
 	}
+	local_irq_restore(flags);
 }
 
 /**
@@ -171,13 +178,8 @@ static void __call_rcu(struct rcu_head *
 void call_rcu(struct rcu_head *head,
 				void (*func)(struct rcu_head *rcu))
 {
-	unsigned long flags;
-
 	head->func = func;
-	head->next = NULL;
-	local_irq_save(flags);
 	__call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
-	local_irq_restore(flags);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -200,13 +202,8 @@ EXPORT_SYMBOL_GPL(call_rcu);
 void call_rcu_bh(struct rcu_head *head,
 				void (*func)(struct rcu_head *rcu))
 {
-	unsigned long flags;
-
 	head->func = func;
-	head->next = NULL;
-	local_irq_save(flags);
 	__call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
-	local_irq_restore(flags);
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
@@ -389,17 +386,17 @@ static void rcu_move_batch(struct rcu_da
 static void __rcu_offline_cpu(struct rcu_data *this_rdp,
 				struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
 {
-	/* if the cpu going offline owns the grace period
+	/*
+	 * if the cpu going offline owns the grace period
 	 * we can block indefinitely waiting for it, so flush
 	 * it here
 	 */
 	spin_lock_bh(&rcp->lock);
 	if (rcp->cur != rcp->completed)
 		cpu_quiet(rdp->cpu, rcp);
-	spin_unlock_bh(&rcp->lock);
-	/* spin_lock implies smp_mb() */
 	rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
 	rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
+	spin_unlock_bh(&rcp->lock);
 }
 
 static void rcu_offline_cpu(int cpu)
@@ -429,16 +426,19 @@ static void rcu_offline_cpu(int cpu)
 static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
 					struct rcu_data *rdp)
 {
+	long completed_snap;
+
 	if (rdp->nxtlist) {
 		local_irq_disable();
+		completed_snap = ACCESS_ONCE(rcp->completed);
 
 		/*
 		 * move the other grace-period-completed entries to
 		 * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
 		 */
-		if (!rcu_batch_before(rcp->completed, rdp->batch))
+		if (!rcu_batch_before(completed_snap, rdp->batch))
 			rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
-		else if (!rcu_batch_before(rcp->completed, rdp->batch - 1))
+		else if (!rcu_batch_before(completed_snap, rdp->batch - 1))
 			rdp->nxttail[0] = rdp->nxttail[1];
 
 		/*
@@ -479,20 +479,38 @@ static void __rcu_process_callbacks(stru
 
 static void rcu_process_callbacks(struct softirq_action *unused)
 {
+	/*
+	 * Memory references from any prior RCU read-side critical sections
+	 * executed by the interrupted code must be see before any RCU
+	 * grace-period manupulations below.
+	 */
+
+	smp_mb(); /* See above block comment. */
+
 	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
 	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
+
+	/*
+	 * Memory references from any later RCU read-side critical sections
+	 * executed by the interrupted code must be see after any RCU
+	 * grace-period manupulations above.
+	 */
+
+	smp_mb(); /* See above block comment. */
 }
 
 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
 {
 	if (rdp->nxtlist) {
+		long completed_snap = ACCESS_ONCE(rcp->completed);
+
 		/*
 		 * This cpu has pending rcu entries and the grace period
 		 * for them has completed.
 		 */
-		if (!rcu_batch_before(rcp->completed, rdp->batch))
+		if (!rcu_batch_before(completed_snap, rdp->batch))
 			return 1;
-		if (!rcu_batch_before(rcp->completed, rdp->batch - 1) &&
+		if (!rcu_batch_before(completed_snap, rdp->batch - 1) &&
 				rdp->nxttail[0] != rdp->nxttail[1])
 			return 1;
 		if (rdp->nxttail[0] != &rdp->nxtlist)
@@ -543,6 +561,12 @@ int rcu_needs_cpu(int cpu)
 	return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
 }
 
+/*
+ * Top-level function driving RCU grace-period detection, normally
+ * invoked from the scheduler-clock interrupt.  This function simply
+ * increments counters that are read only from softirq by this same
+ * CPU, so there are no memory barriers required.
+ */
 void rcu_check_callbacks(int cpu, int user)
 {
 	if (user ||
@@ -558,6 +582,7 @@ void rcu_check_callbacks(int cpu, int us
 static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
 						struct rcu_data *rdp)
 {
+	spin_lock(&rcp->lock);
 	memset(rdp, 0, sizeof(*rdp));
 	rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
 	rdp->donetail = &rdp->donelist;
@@ -565,6 +590,7 @@ static void rcu_init_percpu_data(int cpu
 	rdp->qs_pending = 0;
 	rdp->cpu = cpu;
 	rdp->blimit = blimit;
+	spin_unlock(&rcp->lock);
 }
 
 static void __cpuinit rcu_online_cpu(int cpu)
--
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]

Messages in current thread:
Re: [RFC][PATCH 2/2] rcu classic: new algorithm for callback..., Paul E. McKenney, (Fri Aug 1, 5:11 pm)