1 /* 2 * Queued spinlock 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2 of the License, or 7 * (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * (C) Copyright 2013-2015 Hewlett-Packard Development Company, L.P. 15 * (C) Copyright 2013-2014 Red Hat, Inc. 16 * (C) Copyright 2015 Intel Corp. 17 * (C) Copyright 2015 Hewlett-Packard Enterprise Development LP 18 * 19 * Authors: Waiman Long <waiman.long@hpe.com> 20 * Peter Zijlstra <peterz@infradead.org> 21 */ 22 23 #ifndef _GEN_PV_LOCK_SLOWPATH 24 25 #include <linux/smp.h> 26 #include <linux/bug.h> 27 #include <linux/cpumask.h> 28 #include <linux/percpu.h> 29 #include <linux/hardirq.h> 30 #include <linux/mutex.h> 31 #include <asm/byteorder.h> 32 #include <asm/qspinlock.h> 33 34 /* 35 * The basic principle of a queue-based spinlock can best be understood 36 * by studying a classic queue-based spinlock implementation called the 37 * MCS lock. The paper below provides a good description for this kind 38 * of lock. 39 * 40 * http://www.cise.ufl.edu/tr/DOC/REP-1992-71.pdf 41 * 42 * This queued spinlock implementation is based on the MCS lock, however to make 43 * it fit the 4 bytes we assume spinlock_t to be, and preserve its existing 44 * API, we must modify it somehow. 45 * 46 * In particular; where the traditional MCS lock consists of a tail pointer 47 * (8 bytes) and needs the next pointer (another 8 bytes) of its own node to 48 * unlock the next pending (next->locked), we compress both these: {tail, 49 * next->locked} into a single u32 value. 50 * 51 * Since a spinlock disables recursion of its own context and there is a limit 52 * to the contexts that can nest; namely: task, softirq, hardirq, nmi. As there 53 * are at most 4 nesting levels, it can be encoded by a 2-bit number. Now 54 * we can encode the tail by combining the 2-bit nesting level with the cpu 55 * number. With one byte for the lock value and 3 bytes for the tail, only a 56 * 32-bit word is now needed. Even though we only need 1 bit for the lock, 57 * we extend it to a full byte to achieve better performance for architectures 58 * that support atomic byte write. 59 * 60 * We also change the first spinner to spin on the lock bit instead of its 61 * node; whereby avoiding the need to carry a node from lock to unlock, and 62 * preserving existing lock API. This also makes the unlock code simpler and 63 * faster. 64 * 65 * N.B. The current implementation only supports architectures that allow 66 * atomic operations on smaller 8-bit and 16-bit data types. 67 * 68 */ 69 70 #include "mcs_spinlock.h" 71 72 #ifdef CONFIG_PARAVIRT_SPINLOCKS 73 #define MAX_NODES 8 74 #else 75 #define MAX_NODES 4 76 #endif 77 78 /* 79 * The pending bit spinning loop count. 80 * This heuristic is used to limit the number of lockword accesses 81 * made by atomic_cond_read_relaxed when waiting for the lock to 82 * transition out of the "== _Q_PENDING_VAL" state. We don't spin 83 * indefinitely because there's no guarantee that we'll make forward 84 * progress. 85 */ 86 #ifndef _Q_PENDING_LOOPS 87 #define _Q_PENDING_LOOPS 1 88 #endif 89 90 /* 91 * Per-CPU queue node structures; we can never have more than 4 nested 92 * contexts: task, softirq, hardirq, nmi. 93 * 94 * Exactly fits one 64-byte cacheline on a 64-bit architecture. 95 * 96 * PV doubles the storage and uses the second cacheline for PV state. 97 */ 98 static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]); 99 100 /* 101 * We must be able to distinguish between no-tail and the tail at 0:0, 102 * therefore increment the cpu number by one. 103 */ 104 105 static inline __pure u32 encode_tail(int cpu, int idx) 106 { 107 u32 tail; 108 109 #ifdef CONFIG_DEBUG_SPINLOCK 110 BUG_ON(idx > 3); 111 #endif 112 tail = (cpu + 1) << _Q_TAIL_CPU_OFFSET; 113 tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */ 114 115 return tail; 116 } 117 118 static inline __pure struct mcs_spinlock *decode_tail(u32 tail) 119 { 120 int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1; 121 int idx = (tail & _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET; 122 123 return per_cpu_ptr(&mcs_nodes[idx], cpu); 124 } 125 126 #define _Q_LOCKED_PENDING_MASK (_Q_LOCKED_MASK | _Q_PENDING_MASK) 127 128 #if _Q_PENDING_BITS == 8 129 /** 130 * clear_pending - clear the pending bit. 131 * @lock: Pointer to queued spinlock structure 132 * 133 * *,1,* -> *,0,* 134 */ 135 static __always_inline void clear_pending(struct qspinlock *lock) 136 { 137 WRITE_ONCE(lock->pending, 0); 138 } 139 140 /** 141 * clear_pending_set_locked - take ownership and clear the pending bit. 142 * @lock: Pointer to queued spinlock structure 143 * 144 * *,1,0 -> *,0,1 145 * 146 * Lock stealing is not allowed if this function is used. 147 */ 148 static __always_inline void clear_pending_set_locked(struct qspinlock *lock) 149 { 150 WRITE_ONCE(lock->locked_pending, _Q_LOCKED_VAL); 151 } 152 153 /* 154 * xchg_tail - Put in the new queue tail code word & retrieve previous one 155 * @lock : Pointer to queued spinlock structure 156 * @tail : The new queue tail code word 157 * Return: The previous queue tail code word 158 * 159 * xchg(lock, tail), which heads an address dependency 160 * 161 * p,*,* -> n,*,* ; prev = xchg(lock, node) 162 */ 163 static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) 164 { 165 /* 166 * Use release semantics to make sure that the MCS node is properly 167 * initialized before changing the tail code. 168 */ 169 return (u32)xchg_release(&lock->tail, 170 tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET; 171 } 172 173 #else /* _Q_PENDING_BITS == 8 */ 174 175 /** 176 * clear_pending - clear the pending bit. 177 * @lock: Pointer to queued spinlock structure 178 * 179 * *,1,* -> *,0,* 180 */ 181 static __always_inline void clear_pending(struct qspinlock *lock) 182 { 183 atomic_andnot(_Q_PENDING_VAL, &lock->val); 184 } 185 186 /** 187 * clear_pending_set_locked - take ownership and clear the pending bit. 188 * @lock: Pointer to queued spinlock structure 189 * 190 * *,1,0 -> *,0,1 191 */ 192 static __always_inline void clear_pending_set_locked(struct qspinlock *lock) 193 { 194 atomic_add(-_Q_PENDING_VAL + _Q_LOCKED_VAL, &lock->val); 195 } 196 197 /** 198 * xchg_tail - Put in the new queue tail code word & retrieve previous one 199 * @lock : Pointer to queued spinlock structure 200 * @tail : The new queue tail code word 201 * Return: The previous queue tail code word 202 * 203 * xchg(lock, tail) 204 * 205 * p,*,* -> n,*,* ; prev = xchg(lock, node) 206 */ 207 static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) 208 { 209 u32 old, new, val = atomic_read(&lock->val); 210 211 for (;;) { 212 new = (val & _Q_LOCKED_PENDING_MASK) | tail; 213 /* 214 * Use release semantics to make sure that the MCS node is 215 * properly initialized before changing the tail code. 216 */ 217 old = atomic_cmpxchg_release(&lock->val, val, new); 218 if (old == val) 219 break; 220 221 val = old; 222 } 223 return old; 224 } 225 #endif /* _Q_PENDING_BITS == 8 */ 226 227 /** 228 * queued_fetch_set_pending_acquire - fetch the whole lock value and set pending 229 * @lock : Pointer to queued spinlock structure 230 * Return: The previous lock value 231 * 232 * *,*,* -> *,1,* 233 */ 234 #ifndef queued_fetch_set_pending_acquire 235 static __always_inline u32 queued_fetch_set_pending_acquire(struct qspinlock *lock) 236 { 237 return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); 238 } 239 #endif 240 241 /** 242 * set_locked - Set the lock bit and own the lock 243 * @lock: Pointer to queued spinlock structure 244 * 245 * *,*,0 -> *,0,1 246 */ 247 static __always_inline void set_locked(struct qspinlock *lock) 248 { 249 WRITE_ONCE(lock->locked, _Q_LOCKED_VAL); 250 } 251 252 253 /* 254 * Generate the native code for queued_spin_unlock_slowpath(); provide NOPs for 255 * all the PV callbacks. 256 */ 257 258 static __always_inline void __pv_init_node(struct mcs_spinlock *node) { } 259 static __always_inline void __pv_wait_node(struct mcs_spinlock *node, 260 struct mcs_spinlock *prev) { } 261 static __always_inline void __pv_kick_node(struct qspinlock *lock, 262 struct mcs_spinlock *node) { } 263 static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock, 264 struct mcs_spinlock *node) 265 { return 0; } 266 267 #define pv_enabled() false 268 269 #define pv_init_node __pv_init_node 270 #define pv_wait_node __pv_wait_node 271 #define pv_kick_node __pv_kick_node 272 #define pv_wait_head_or_lock __pv_wait_head_or_lock 273 274 #ifdef CONFIG_PARAVIRT_SPINLOCKS 275 #define queued_spin_lock_slowpath native_queued_spin_lock_slowpath 276 #endif 277 278 /* 279 * Various notes on spin_is_locked() and spin_unlock_wait(), which are 280 * 'interesting' functions: 281 * 282 * PROBLEM: some architectures have an interesting issue with atomic ACQUIRE 283 * operations in that the ACQUIRE applies to the LOAD _not_ the STORE (ARM64, 284 * PPC). Also qspinlock has a similar issue per construction, the setting of 285 * the locked byte can be unordered acquiring the lock proper. 286 * 287 * This gets to be 'interesting' in the following cases, where the /should/s 288 * end up false because of this issue. 289 * 290 * 291 * CASE 1: 292 * 293 * So the spin_is_locked() correctness issue comes from something like: 294 * 295 * CPU0 CPU1 296 * 297 * global_lock(); local_lock(i) 298 * spin_lock(&G) spin_lock(&L[i]) 299 * for (i) if (!spin_is_locked(&G)) { 300 * spin_unlock_wait(&L[i]); smp_acquire__after_ctrl_dep(); 301 * return; 302 * } 303 * // deal with fail 304 * 305 * Where it is important CPU1 sees G locked or CPU0 sees L[i] locked such 306 * that there is exclusion between the two critical sections. 307 * 308 * The load from spin_is_locked(&G) /should/ be constrained by the ACQUIRE from 309 * spin_lock(&L[i]), and similarly the load(s) from spin_unlock_wait(&L[i]) 310 * /should/ be constrained by the ACQUIRE from spin_lock(&G). 311 * 312 * Similarly, later stuff is constrained by the ACQUIRE from CTRL+RMB. 313 * 314 * 315 * CASE 2: 316 * 317 * For spin_unlock_wait() there is a second correctness issue, namely: 318 * 319 * CPU0 CPU1 320 * 321 * flag = set; 322 * smp_mb(); spin_lock(&l) 323 * spin_unlock_wait(&l); if (!flag) 324 * // add to lockless list 325 * spin_unlock(&l); 326 * // iterate lockless list 327 * 328 * Which wants to ensure that CPU1 will stop adding bits to the list and CPU0 329 * will observe the last entry on the list (if spin_unlock_wait() had ACQUIRE 330 * semantics etc..) 331 * 332 * Where flag /should/ be ordered against the locked store of l. 333 */ 334 335 /* 336 * queued_spin_lock_slowpath() can (load-)ACQUIRE the lock before 337 * issuing an _unordered_ store to set _Q_LOCKED_VAL. 338 * 339 * This means that the store can be delayed, but no later than the 340 * store-release from the unlock. This means that simply observing 341 * _Q_LOCKED_VAL is not sufficient to determine if the lock is acquired. 342 * 343 * There are two paths that can issue the unordered store: 344 * 345 * (1) clear_pending_set_locked(): *,1,0 -> *,0,1 346 * 347 * (2) set_locked(): t,0,0 -> t,0,1 ; t != 0 348 * atomic_cmpxchg_relaxed(): t,0,0 -> 0,0,1 349 * 350 * However, in both cases we have other !0 state we've set before to queue 351 * ourseves: 352 * 353 * For (1) we have the atomic_cmpxchg_acquire() that set _Q_PENDING_VAL, our 354 * load is constrained by that ACQUIRE to not pass before that, and thus must 355 * observe the store. 356 * 357 * For (2) we have a more intersting scenario. We enqueue ourselves using 358 * xchg_tail(), which ends up being a RELEASE. This in itself is not 359 * sufficient, however that is followed by an smp_cond_acquire() on the same 360 * word, giving a RELEASE->ACQUIRE ordering. This again constrains our load and 361 * guarantees we must observe that store. 362 * 363 * Therefore both cases have other !0 state that is observable before the 364 * unordered locked byte store comes through. This means we can use that to 365 * wait for the lock store, and then wait for an unlock. 366 */ 367 #ifndef queued_spin_unlock_wait 368 void queued_spin_unlock_wait(struct qspinlock *lock) 369 { 370 u32 val; 371 372 for (;;) { 373 val = atomic_read(&lock->val); 374 375 if (!val) /* not locked, we're done */ 376 goto done; 377 378 if (val & _Q_LOCKED_MASK) /* locked, go wait for unlock */ 379 break; 380 381 /* not locked, but pending, wait until we observe the lock */ 382 cpu_relax(); 383 } 384 385 /* any unlock is good */ 386 while (atomic_read(&lock->val) & _Q_LOCKED_MASK) 387 cpu_relax(); 388 389 done: 390 smp_acquire__after_ctrl_dep(); 391 } 392 EXPORT_SYMBOL(queued_spin_unlock_wait); 393 #endif 394 395 #endif /* _GEN_PV_LOCK_SLOWPATH */ 396 397 /** 398 * queued_spin_lock_slowpath - acquire the queued spinlock 399 * @lock: Pointer to queued spinlock structure 400 * @val: Current value of the queued spinlock 32-bit word 401 * 402 * (queue tail, pending bit, lock value) 403 * 404 * fast : slow : unlock 405 * : : 406 * uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0) 407 * : | ^--------.------. / : 408 * : v \ \ | : 409 * pending : (0,1,1) +--> (0,1,0) \ | : 410 * : | ^--' | | : 411 * : v | | : 412 * uncontended : (n,x,y) +--> (n,0,0) --' | : 413 * queue : | ^--' | : 414 * : v | : 415 * contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' : 416 * queue : ^--' : 417 */ 418 void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) 419 { 420 struct mcs_spinlock *prev, *next, *node; 421 u32 old, tail; 422 int idx; 423 424 BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); 425 426 if (pv_enabled()) 427 goto queue; 428 429 if (virt_spin_lock(lock)) 430 return; 431 432 /* 433 * Wait for in-progress pending->locked hand-overs with a bounded 434 * number of spins so that we guarantee forward progress. 435 * 436 * 0,1,0 -> 0,0,1 437 */ 438 if (val == _Q_PENDING_VAL) { 439 int cnt = _Q_PENDING_LOOPS; 440 val = smp_cond_load_acquire(&lock->val.counter, 441 (VAL != _Q_PENDING_VAL) || !cnt--); 442 } 443 444 /* 445 * If we observe any contention; queue. 446 */ 447 if (val & ~_Q_LOCKED_MASK) 448 goto queue; 449 450 /* 451 * trylock || pending 452 * 453 * 0,0,0 -> 0,0,1 ; trylock 454 * 0,0,1 -> 0,1,1 ; pending 455 */ 456 val = queued_fetch_set_pending_acquire(lock); 457 458 /* 459 * If we observe any contention; undo and queue. 460 */ 461 if (unlikely(val & ~_Q_LOCKED_MASK)) { 462 if (!(val & _Q_PENDING_MASK)) 463 clear_pending(lock); 464 goto queue; 465 } 466 467 /* 468 * We're pending, wait for the owner to go away. 469 * 470 * 0,1,1 -> 0,1,0 471 * 472 * this wait loop must be a load-acquire such that we match the 473 * store-release that clears the locked bit and create lock 474 * sequentiality; this is because not all 475 * clear_pending_set_locked() implementations imply full 476 * barriers. 477 */ 478 if (val & _Q_LOCKED_MASK) 479 smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK)); 480 481 /* 482 * take ownership and clear the pending bit. 483 * 484 * 0,1,0 -> 0,0,1 485 */ 486 clear_pending_set_locked(lock); 487 return; 488 489 /* 490 * End of pending bit optimistic spinning and beginning of MCS 491 * queuing. 492 */ 493 queue: 494 node = this_cpu_ptr(&mcs_nodes[0]); 495 idx = node->count++; 496 tail = encode_tail(smp_processor_id(), idx); 497 498 node += idx; 499 500 /* 501 * Ensure that we increment the head node->count before initialising 502 * the actual node. If the compiler is kind enough to reorder these 503 * stores, then an IRQ could overwrite our assignments. 504 */ 505 barrier(); 506 507 node->locked = 0; 508 node->next = NULL; 509 pv_init_node(node); 510 511 /* 512 * We touched a (possibly) cold cacheline in the per-cpu queue node; 513 * attempt the trylock once more in the hope someone let go while we 514 * weren't watching. 515 */ 516 if (queued_spin_trylock(lock)) 517 goto release; 518 519 /* 520 * We have already touched the queueing cacheline; don't bother with 521 * pending stuff. 522 * 523 * p,*,* -> n,*,* 524 * 525 * RELEASE, such that the stores to @node must be complete. 526 */ 527 old = xchg_tail(lock, tail); 528 next = NULL; 529 530 /* 531 * if there was a previous node; link it and wait until reaching the 532 * head of the waitqueue. 533 */ 534 if (old & _Q_TAIL_MASK) { 535 prev = decode_tail(old); 536 537 /* 538 * We must ensure that the stores to @node are observed before 539 * the write to prev->next. The address dependency from 540 * xchg_tail is not sufficient to ensure this because the read 541 * component of xchg_tail is unordered with respect to the 542 * initialisation of @node. 543 */ 544 smp_store_release(&prev->next, node); 545 546 pv_wait_node(node, prev); 547 arch_mcs_spin_lock_contended(&node->locked); 548 549 /* 550 * While waiting for the MCS lock, the next pointer may have 551 * been set by another lock waiter. We optimistically load 552 * the next pointer & prefetch the cacheline for writing 553 * to reduce latency in the upcoming MCS unlock operation. 554 */ 555 next = READ_ONCE(node->next); 556 if (next) 557 prefetchw(next); 558 } 559 560 /* 561 * we're at the head of the waitqueue, wait for the owner & pending to 562 * go away. 563 * 564 * *,x,y -> *,0,0 565 * 566 * this wait loop must use a load-acquire such that we match the 567 * store-release that clears the locked bit and create lock 568 * sequentiality; this is because the set_locked() function below 569 * does not imply a full barrier. 570 * 571 * The PV pv_wait_head_or_lock function, if active, will acquire 572 * the lock and return a non-zero value. So we have to skip the 573 * smp_cond_load_acquire() call. As the next PV queue head hasn't been 574 * designated yet, there is no way for the locked value to become 575 * _Q_SLOW_VAL. So both the set_locked() and the 576 * atomic_cmpxchg_relaxed() calls will be safe. 577 * 578 * If PV isn't active, 0 will be returned instead. 579 * 580 */ 581 if ((val = pv_wait_head_or_lock(lock, node))) 582 goto locked; 583 584 val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK)); 585 586 locked: 587 /* 588 * claim the lock: 589 * 590 * n,0,0 -> 0,0,1 : lock, uncontended 591 * *,*,0 -> *,*,1 : lock, contended 592 * 593 * If the queue head is the only one in the queue (lock value == tail) 594 * and nobody is pending, clear the tail code and grab the lock. 595 * Otherwise, we only need to grab the lock. 596 */ 597 598 /* In the PV case we might already have _Q_LOCKED_VAL set */ 599 if ((val & _Q_TAIL_MASK) == tail) { 600 /* 601 * The smp_cond_load_acquire() call above has provided the 602 * necessary acquire semantics required for locking. 603 */ 604 old = atomic_cmpxchg_relaxed(&lock->val, val, _Q_LOCKED_VAL); 605 if (old == val) 606 goto release; /* No contention */ 607 } 608 609 /* Either somebody is queued behind us or _Q_PENDING_VAL is set */ 610 set_locked(lock); 611 612 /* 613 * contended path; wait for next if not observed yet, release. 614 */ 615 if (!next) { 616 while (!(next = READ_ONCE(node->next))) 617 cpu_relax(); 618 } 619 620 arch_mcs_spin_unlock_contended(&next->locked); 621 pv_kick_node(lock, next); 622 623 release: 624 /* 625 * release the node 626 */ 627 __this_cpu_dec(mcs_nodes[0].count); 628 } 629 EXPORT_SYMBOL(queued_spin_lock_slowpath); 630 631 /* 632 * Generate the paravirt code for queued_spin_unlock_slowpath(). 633 */ 634 #if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS) 635 #define _GEN_PV_LOCK_SLOWPATH 636 637 #undef pv_enabled 638 #define pv_enabled() true 639 640 #undef pv_init_node 641 #undef pv_wait_node 642 #undef pv_kick_node 643 #undef pv_wait_head_or_lock 644 645 #undef queued_spin_lock_slowpath 646 #define queued_spin_lock_slowpath __pv_queued_spin_lock_slowpath 647 648 #include "qspinlock_paravirt.h" 649 #include "qspinlock.c" 650 651 #endif 652
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.