~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/dlm/lock.c

Version: ~ [ linux-5.5-rc7 ] ~ [ linux-5.4.13 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.97 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.166 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.210 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.210 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.19.8 ] ~ [ linux-3.18.140 ] ~ [ linux-3.17.8 ] ~ [ linux-3.16.81 ] ~ [ linux-3.15.10 ] ~ [ linux-3.14.79 ] ~ [ linux-3.13.11 ] ~ [ linux-3.12.74 ] ~ [ linux-3.11.10 ] ~ [ linux-3.10.108 ] ~ [ linux-3.9.11 ] ~ [ linux-3.8.13 ] ~ [ linux-3.7.10 ] ~ [ linux-3.6.11 ] ~ [ linux-3.5.7 ] ~ [ linux-3.4.113 ] ~ [ linux-3.3.8 ] ~ [ linux-3.2.102 ] ~ [ linux-3.1.10 ] ~ [ linux-3.0.101 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 /******************************************************************************
  2 *******************************************************************************
  3 **
  4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
  5 **
  6 **  This copyrighted material is made available to anyone wishing to use,
  7 **  modify, copy, or redistribute it subject to the terms and conditions
  8 **  of the GNU General Public License v.2.
  9 **
 10 *******************************************************************************
 11 ******************************************************************************/
 12 
 13 /* Central locking logic has four stages:
 14 
 15    dlm_lock()
 16    dlm_unlock()
 17 
 18    request_lock(ls, lkb)
 19    convert_lock(ls, lkb)
 20    unlock_lock(ls, lkb)
 21    cancel_lock(ls, lkb)
 22 
 23    _request_lock(r, lkb)
 24    _convert_lock(r, lkb)
 25    _unlock_lock(r, lkb)
 26    _cancel_lock(r, lkb)
 27 
 28    do_request(r, lkb)
 29    do_convert(r, lkb)
 30    do_unlock(r, lkb)
 31    do_cancel(r, lkb)
 32 
 33    Stage 1 (lock, unlock) is mainly about checking input args and
 34    splitting into one of the four main operations:
 35 
 36        dlm_lock          = request_lock
 37        dlm_lock+CONVERT  = convert_lock
 38        dlm_unlock        = unlock_lock
 39        dlm_unlock+CANCEL = cancel_lock
 40 
 41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
 42    provided to the next stage.
 43 
 44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
 45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
 46 
 47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
 48    given rsb and lkb and queues callbacks.
 49 
 50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
 51    function being executed on the remote node.  The connecting send/receive
 52    calls on local (L) and remote (R) nodes:
 53 
 54    L: send_xxxx()              ->  R: receive_xxxx()
 55                                    R: do_xxxx()
 56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 57 */
 58 #include <linux/types.h>
 59 #include <linux/rbtree.h>
 60 #include <linux/slab.h>
 61 #include "dlm_internal.h"
 62 #include <linux/dlm_device.h>
 63 #include "memory.h"
 64 #include "lowcomms.h"
 65 #include "requestqueue.h"
 66 #include "util.h"
 67 #include "dir.h"
 68 #include "member.h"
 69 #include "lockspace.h"
 70 #include "ast.h"
 71 #include "lock.h"
 72 #include "rcom.h"
 73 #include "recover.h"
 74 #include "lvb_table.h"
 75 #include "user.h"
 76 #include "config.h"
 77 
 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
 85 static int send_remove(struct dlm_rsb *r);
 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 89                                     struct dlm_message *ms);
 90 static int receive_extralen(struct dlm_message *ms);
 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 92 static void del_timeout(struct dlm_lkb *lkb);
 93 
 94 /*
 95  * Lock compatibilty matrix - thanks Steve
 96  * UN = Unlocked state. Not really a state, used as a flag
 97  * PD = Padding. Used to make the matrix a nice power of two in size
 98  * Other states are the same as the VMS DLM.
 99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
174                r->res_nodeid, r->res_flags, r->res_first_lkid,
175                r->res_recover_locks_count, r->res_name);
176 }
177 
178 void dlm_dump_rsb(struct dlm_rsb *r)
179 {
180         struct dlm_lkb *lkb;
181 
182         dlm_print_rsb(r);
183 
184         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
185                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
186         printk(KERN_ERR "rsb lookup list\n");
187         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb grant queue:\n");
190         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb convert queue:\n");
193         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb wait queue:\n");
196         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198 }
199 
200 /* Threads cannot use the lockspace while it's being recovered */
201 
202 static inline void dlm_lock_recovery(struct dlm_ls *ls)
203 {
204         down_read(&ls->ls_in_recovery);
205 }
206 
207 void dlm_unlock_recovery(struct dlm_ls *ls)
208 {
209         up_read(&ls->ls_in_recovery);
210 }
211 
212 int dlm_lock_recovery_try(struct dlm_ls *ls)
213 {
214         return down_read_trylock(&ls->ls_in_recovery);
215 }
216 
217 static inline int can_be_queued(struct dlm_lkb *lkb)
218 {
219         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
220 }
221 
222 static inline int force_blocking_asts(struct dlm_lkb *lkb)
223 {
224         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
225 }
226 
227 static inline int is_demoted(struct dlm_lkb *lkb)
228 {
229         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
230 }
231 
232 static inline int is_altmode(struct dlm_lkb *lkb)
233 {
234         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
235 }
236 
237 static inline int is_granted(struct dlm_lkb *lkb)
238 {
239         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
240 }
241 
242 static inline int is_remote(struct dlm_rsb *r)
243 {
244         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
245         return !!r->res_nodeid;
246 }
247 
248 static inline int is_process_copy(struct dlm_lkb *lkb)
249 {
250         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
251 }
252 
253 static inline int is_master_copy(struct dlm_lkb *lkb)
254 {
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257 
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265 
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270 
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275 
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280 
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286 
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291 
292         del_timeout(lkb);
293 
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295 
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302 
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307 
308         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310 
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313         queue_cast(r, lkb,
314                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316 
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319         if (is_master_copy(lkb)) {
320                 send_bast(r, lkb, rqmode);
321         } else {
322                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
323         }
324 }
325 
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329 
330 static int pre_rsb_struct(struct dlm_ls *ls)
331 {
332         struct dlm_rsb *r1, *r2;
333         int count = 0;
334 
335         spin_lock(&ls->ls_new_rsb_spin);
336         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337                 spin_unlock(&ls->ls_new_rsb_spin);
338                 return 0;
339         }
340         spin_unlock(&ls->ls_new_rsb_spin);
341 
342         r1 = dlm_allocate_rsb(ls);
343         r2 = dlm_allocate_rsb(ls);
344 
345         spin_lock(&ls->ls_new_rsb_spin);
346         if (r1) {
347                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348                 ls->ls_new_rsb_count++;
349         }
350         if (r2) {
351                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352                 ls->ls_new_rsb_count++;
353         }
354         count = ls->ls_new_rsb_count;
355         spin_unlock(&ls->ls_new_rsb_spin);
356 
357         if (!count)
358                 return -ENOMEM;
359         return 0;
360 }
361 
362 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363    unlock any spinlocks, go back and call pre_rsb_struct again.
364    Otherwise, take an rsb off the list and return it. */
365 
366 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367                           struct dlm_rsb **r_ret)
368 {
369         struct dlm_rsb *r;
370         int count;
371 
372         spin_lock(&ls->ls_new_rsb_spin);
373         if (list_empty(&ls->ls_new_rsb)) {
374                 count = ls->ls_new_rsb_count;
375                 spin_unlock(&ls->ls_new_rsb_spin);
376                 log_debug(ls, "find_rsb retry %d %d %s",
377                           count, dlm_config.ci_new_rsb_count, name);
378                 return -EAGAIN;
379         }
380 
381         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382         list_del(&r->res_hashchain);
383         /* Convert the empty list_head to a NULL rb_node for tree usage: */
384         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
385         ls->ls_new_rsb_count--;
386         spin_unlock(&ls->ls_new_rsb_spin);
387 
388         r->res_ls = ls;
389         r->res_length = len;
390         memcpy(r->res_name, name, len);
391         mutex_init(&r->res_mutex);
392 
393         INIT_LIST_HEAD(&r->res_lookup);
394         INIT_LIST_HEAD(&r->res_grantqueue);
395         INIT_LIST_HEAD(&r->res_convertqueue);
396         INIT_LIST_HEAD(&r->res_waitqueue);
397         INIT_LIST_HEAD(&r->res_root_list);
398         INIT_LIST_HEAD(&r->res_recover_list);
399 
400         *r_ret = r;
401         return 0;
402 }
403 
404 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
405 {
406         char maxname[DLM_RESNAME_MAXLEN];
407 
408         memset(maxname, 0, DLM_RESNAME_MAXLEN);
409         memcpy(maxname, name, nlen);
410         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
411 }
412 
413 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
414                         unsigned int flags, struct dlm_rsb **r_ret)
415 {
416         struct rb_node *node = tree->rb_node;
417         struct dlm_rsb *r;
418         int error = 0;
419         int rc;
420 
421         while (node) {
422                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
423                 rc = rsb_cmp(r, name, len);
424                 if (rc < 0)
425                         node = node->rb_left;
426                 else if (rc > 0)
427                         node = node->rb_right;
428                 else
429                         goto found;
430         }
431         *r_ret = NULL;
432         return -EBADR;
433 
434  found:
435         if (r->res_nodeid && (flags & R_MASTER))
436                 error = -ENOTBLK;
437         *r_ret = r;
438         return error;
439 }
440 
441 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
442 {
443         struct rb_node **newn = &tree->rb_node;
444         struct rb_node *parent = NULL;
445         int rc;
446 
447         while (*newn) {
448                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
449                                                res_hashnode);
450 
451                 parent = *newn;
452                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
453                 if (rc < 0)
454                         newn = &parent->rb_left;
455                 else if (rc > 0)
456                         newn = &parent->rb_right;
457                 else {
458                         log_print("rsb_insert match");
459                         dlm_dump_rsb(rsb);
460                         dlm_dump_rsb(cur);
461                         return -EEXIST;
462                 }
463         }
464 
465         rb_link_node(&rsb->res_hashnode, parent, newn);
466         rb_insert_color(&rsb->res_hashnode, tree);
467         return 0;
468 }
469 
470 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
471                        unsigned int flags, struct dlm_rsb **r_ret)
472 {
473         struct dlm_rsb *r;
474         int error;
475 
476         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
477         if (!error) {
478                 kref_get(&r->res_ref);
479                 goto out;
480         }
481         if (error == -ENOTBLK)
482                 goto out;
483 
484         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
485         if (error)
486                 goto out;
487 
488         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
489         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
490         if (error)
491                 return error;
492 
493         if (dlm_no_directory(ls))
494                 goto out;
495 
496         if (r->res_nodeid == -1) {
497                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
498                 r->res_first_lkid = 0;
499         } else if (r->res_nodeid > 0) {
500                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
501                 r->res_first_lkid = 0;
502         } else {
503                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
504                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
505         }
506  out:
507         *r_ret = r;
508         return error;
509 }
510 
511 /*
512  * Find rsb in rsbtbl and potentially create/add one
513  *
514  * Delaying the release of rsb's has a similar benefit to applications keeping
515  * NL locks on an rsb, but without the guarantee that the cached master value
516  * will still be valid when the rsb is reused.  Apps aren't always smart enough
517  * to keep NL locks on an rsb that they may lock again shortly; this can lead
518  * to excessive master lookups and removals if we don't delay the release.
519  *
520  * Searching for an rsb means looking through both the normal list and toss
521  * list.  When found on the toss list the rsb is moved to the normal list with
522  * ref count of 1; when found on normal list the ref count is incremented.
523  */
524 
525 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
526                     unsigned int flags, struct dlm_rsb **r_ret)
527 {
528         struct dlm_rsb *r = NULL;
529         uint32_t hash, bucket;
530         int error;
531 
532         if (namelen > DLM_RESNAME_MAXLEN) {
533                 error = -EINVAL;
534                 goto out;
535         }
536 
537         if (dlm_no_directory(ls))
538                 flags |= R_CREATE;
539 
540         hash = jhash(name, namelen, 0);
541         bucket = hash & (ls->ls_rsbtbl_size - 1);
542 
543  retry:
544         if (flags & R_CREATE) {
545                 error = pre_rsb_struct(ls);
546                 if (error < 0)
547                         goto out;
548         }
549 
550         spin_lock(&ls->ls_rsbtbl[bucket].lock);
551 
552         error = _search_rsb(ls, name, namelen, bucket, flags, &r);
553         if (!error)
554                 goto out_unlock;
555 
556         if (error == -EBADR && !(flags & R_CREATE))
557                 goto out_unlock;
558 
559         /* the rsb was found but wasn't a master copy */
560         if (error == -ENOTBLK)
561                 goto out_unlock;
562 
563         error = get_rsb_struct(ls, name, namelen, &r);
564         if (error == -EAGAIN) {
565                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
566                 goto retry;
567         }
568         if (error)
569                 goto out_unlock;
570 
571         r->res_hash = hash;
572         r->res_bucket = bucket;
573         r->res_nodeid = -1;
574         kref_init(&r->res_ref);
575 
576         /* With no directory, the master can be set immediately */
577         if (dlm_no_directory(ls)) {
578                 int nodeid = dlm_dir_nodeid(r);
579                 if (nodeid == dlm_our_nodeid())
580                         nodeid = 0;
581                 r->res_nodeid = nodeid;
582         }
583         error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
584  out_unlock:
585         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
586  out:
587         *r_ret = r;
588         return error;
589 }
590 
591 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
592 {
593         struct rb_node *n;
594         struct dlm_rsb *r;
595         int i;
596 
597         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
598                 spin_lock(&ls->ls_rsbtbl[i].lock);
599                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
600                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
601                         if (r->res_hash == hash)
602                                 dlm_dump_rsb(r);
603                 }
604                 spin_unlock(&ls->ls_rsbtbl[i].lock);
605         }
606 }
607 
608 /* This is only called to add a reference when the code already holds
609    a valid reference to the rsb, so there's no need for locking. */
610 
611 static inline void hold_rsb(struct dlm_rsb *r)
612 {
613         kref_get(&r->res_ref);
614 }
615 
616 void dlm_hold_rsb(struct dlm_rsb *r)
617 {
618         hold_rsb(r);
619 }
620 
621 static void toss_rsb(struct kref *kref)
622 {
623         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
624         struct dlm_ls *ls = r->res_ls;
625 
626         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
627         kref_init(&r->res_ref);
628         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
629         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
630         r->res_toss_time = jiffies;
631         if (r->res_lvbptr) {
632                 dlm_free_lvb(r->res_lvbptr);
633                 r->res_lvbptr = NULL;
634         }
635 }
636 
637 /* When all references to the rsb are gone it's transferred to
638    the tossed list for later disposal. */
639 
640 static void put_rsb(struct dlm_rsb *r)
641 {
642         struct dlm_ls *ls = r->res_ls;
643         uint32_t bucket = r->res_bucket;
644 
645         spin_lock(&ls->ls_rsbtbl[bucket].lock);
646         kref_put(&r->res_ref, toss_rsb);
647         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
648 }
649 
650 void dlm_put_rsb(struct dlm_rsb *r)
651 {
652         put_rsb(r);
653 }
654 
655 /* See comment for unhold_lkb */
656 
657 static void unhold_rsb(struct dlm_rsb *r)
658 {
659         int rv;
660         rv = kref_put(&r->res_ref, toss_rsb);
661         DLM_ASSERT(!rv, dlm_dump_rsb(r););
662 }
663 
664 static void kill_rsb(struct kref *kref)
665 {
666         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
667 
668         /* All work is done after the return from kref_put() so we
669            can release the write_lock before the remove and free. */
670 
671         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
672         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
673         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
674         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
675         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
676         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
677 }
678 
679 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
680    The rsb must exist as long as any lkb's for it do. */
681 
682 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
683 {
684         hold_rsb(r);
685         lkb->lkb_resource = r;
686 }
687 
688 static void detach_lkb(struct dlm_lkb *lkb)
689 {
690         if (lkb->lkb_resource) {
691                 put_rsb(lkb->lkb_resource);
692                 lkb->lkb_resource = NULL;
693         }
694 }
695 
696 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
697 {
698         struct dlm_lkb *lkb;
699         int rv, id;
700 
701         lkb = dlm_allocate_lkb(ls);
702         if (!lkb)
703                 return -ENOMEM;
704 
705         lkb->lkb_nodeid = -1;
706         lkb->lkb_grmode = DLM_LOCK_IV;
707         kref_init(&lkb->lkb_ref);
708         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
709         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
710         INIT_LIST_HEAD(&lkb->lkb_time_list);
711         INIT_LIST_HEAD(&lkb->lkb_cb_list);
712         mutex_init(&lkb->lkb_cb_mutex);
713         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
714 
715  retry:
716         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
717         if (!rv)
718                 return -ENOMEM;
719 
720         spin_lock(&ls->ls_lkbidr_spin);
721         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
722         if (!rv)
723                 lkb->lkb_id = id;
724         spin_unlock(&ls->ls_lkbidr_spin);
725 
726         if (rv == -EAGAIN)
727                 goto retry;
728 
729         if (rv < 0) {
730                 log_error(ls, "create_lkb idr error %d", rv);
731                 return rv;
732         }
733 
734         *lkb_ret = lkb;
735         return 0;
736 }
737 
738 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
739 {
740         struct dlm_lkb *lkb;
741 
742         spin_lock(&ls->ls_lkbidr_spin);
743         lkb = idr_find(&ls->ls_lkbidr, lkid);
744         if (lkb)
745                 kref_get(&lkb->lkb_ref);
746         spin_unlock(&ls->ls_lkbidr_spin);
747 
748         *lkb_ret = lkb;
749         return lkb ? 0 : -ENOENT;
750 }
751 
752 static void kill_lkb(struct kref *kref)
753 {
754         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
755 
756         /* All work is done after the return from kref_put() so we
757            can release the write_lock before the detach_lkb */
758 
759         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
760 }
761 
762 /* __put_lkb() is used when an lkb may not have an rsb attached to
763    it so we need to provide the lockspace explicitly */
764 
765 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
766 {
767         uint32_t lkid = lkb->lkb_id;
768 
769         spin_lock(&ls->ls_lkbidr_spin);
770         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
771                 idr_remove(&ls->ls_lkbidr, lkid);
772                 spin_unlock(&ls->ls_lkbidr_spin);
773 
774                 detach_lkb(lkb);
775 
776                 /* for local/process lkbs, lvbptr points to caller's lksb */
777                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
778                         dlm_free_lvb(lkb->lkb_lvbptr);
779                 dlm_free_lkb(lkb);
780                 return 1;
781         } else {
782                 spin_unlock(&ls->ls_lkbidr_spin);
783                 return 0;
784         }
785 }
786 
787 int dlm_put_lkb(struct dlm_lkb *lkb)
788 {
789         struct dlm_ls *ls;
790 
791         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
792         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
793 
794         ls = lkb->lkb_resource->res_ls;
795         return __put_lkb(ls, lkb);
796 }
797 
798 /* This is only called to add a reference when the code already holds
799    a valid reference to the lkb, so there's no need for locking. */
800 
801 static inline void hold_lkb(struct dlm_lkb *lkb)
802 {
803         kref_get(&lkb->lkb_ref);
804 }
805 
806 /* This is called when we need to remove a reference and are certain
807    it's not the last ref.  e.g. del_lkb is always called between a
808    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
809    put_lkb would work fine, but would involve unnecessary locking */
810 
811 static inline void unhold_lkb(struct dlm_lkb *lkb)
812 {
813         int rv;
814         rv = kref_put(&lkb->lkb_ref, kill_lkb);
815         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
816 }
817 
818 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
819                             int mode)
820 {
821         struct dlm_lkb *lkb = NULL;
822 
823         list_for_each_entry(lkb, head, lkb_statequeue)
824                 if (lkb->lkb_rqmode < mode)
825                         break;
826 
827         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
828 }
829 
830 /* add/remove lkb to rsb's grant/convert/wait queue */
831 
832 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
833 {
834         kref_get(&lkb->lkb_ref);
835 
836         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
837 
838         lkb->lkb_timestamp = ktime_get();
839 
840         lkb->lkb_status = status;
841 
842         switch (status) {
843         case DLM_LKSTS_WAITING:
844                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
845                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
846                 else
847                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
848                 break;
849         case DLM_LKSTS_GRANTED:
850                 /* convention says granted locks kept in order of grmode */
851                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
852                                 lkb->lkb_grmode);
853                 break;
854         case DLM_LKSTS_CONVERT:
855                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
856                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
857                 else
858                         list_add_tail(&lkb->lkb_statequeue,
859                                       &r->res_convertqueue);
860                 break;
861         default:
862                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
863         }
864 }
865 
866 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
867 {
868         lkb->lkb_status = 0;
869         list_del(&lkb->lkb_statequeue);
870         unhold_lkb(lkb);
871 }
872 
873 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
874 {
875         hold_lkb(lkb);
876         del_lkb(r, lkb);
877         add_lkb(r, lkb, sts);
878         unhold_lkb(lkb);
879 }
880 
881 static int msg_reply_type(int mstype)
882 {
883         switch (mstype) {
884         case DLM_MSG_REQUEST:
885                 return DLM_MSG_REQUEST_REPLY;
886         case DLM_MSG_CONVERT:
887                 return DLM_MSG_CONVERT_REPLY;
888         case DLM_MSG_UNLOCK:
889                 return DLM_MSG_UNLOCK_REPLY;
890         case DLM_MSG_CANCEL:
891                 return DLM_MSG_CANCEL_REPLY;
892         case DLM_MSG_LOOKUP:
893                 return DLM_MSG_LOOKUP_REPLY;
894         }
895         return -1;
896 }
897 
898 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
899 {
900         int i;
901 
902         for (i = 0; i < num_nodes; i++) {
903                 if (!warned[i]) {
904                         warned[i] = nodeid;
905                         return 0;
906                 }
907                 if (warned[i] == nodeid)
908                         return 1;
909         }
910         return 0;
911 }
912 
913 void dlm_scan_waiters(struct dlm_ls *ls)
914 {
915         struct dlm_lkb *lkb;
916         ktime_t zero = ktime_set(0, 0);
917         s64 us;
918         s64 debug_maxus = 0;
919         u32 debug_scanned = 0;
920         u32 debug_expired = 0;
921         int num_nodes = 0;
922         int *warned = NULL;
923 
924         if (!dlm_config.ci_waitwarn_us)
925                 return;
926 
927         mutex_lock(&ls->ls_waiters_mutex);
928 
929         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
930                 if (ktime_equal(lkb->lkb_wait_time, zero))
931                         continue;
932 
933                 debug_scanned++;
934 
935                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
936 
937                 if (us < dlm_config.ci_waitwarn_us)
938                         continue;
939 
940                 lkb->lkb_wait_time = zero;
941 
942                 debug_expired++;
943                 if (us > debug_maxus)
944                         debug_maxus = us;
945 
946                 if (!num_nodes) {
947                         num_nodes = ls->ls_num_nodes;
948                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
949                 }
950                 if (!warned)
951                         continue;
952                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
953                         continue;
954 
955                 log_error(ls, "waitwarn %x %lld %d us check connection to "
956                           "node %d", lkb->lkb_id, (long long)us,
957                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
958         }
959         mutex_unlock(&ls->ls_waiters_mutex);
960         kfree(warned);
961 
962         if (debug_expired)
963                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
964                           debug_scanned, debug_expired,
965                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
966 }
967 
968 /* add/remove lkb from global waiters list of lkb's waiting for
969    a reply from a remote node */
970 
971 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
972 {
973         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
974         int error = 0;
975 
976         mutex_lock(&ls->ls_waiters_mutex);
977 
978         if (is_overlap_unlock(lkb) ||
979             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
980                 error = -EINVAL;
981                 goto out;
982         }
983 
984         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
985                 switch (mstype) {
986                 case DLM_MSG_UNLOCK:
987                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
988                         break;
989                 case DLM_MSG_CANCEL:
990                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
991                         break;
992                 default:
993                         error = -EBUSY;
994                         goto out;
995                 }
996                 lkb->lkb_wait_count++;
997                 hold_lkb(lkb);
998 
999                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1000                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1001                           lkb->lkb_wait_count, lkb->lkb_flags);
1002                 goto out;
1003         }
1004 
1005         DLM_ASSERT(!lkb->lkb_wait_count,
1006                    dlm_print_lkb(lkb);
1007                    printk("wait_count %d\n", lkb->lkb_wait_count););
1008 
1009         lkb->lkb_wait_count++;
1010         lkb->lkb_wait_type = mstype;
1011         lkb->lkb_wait_time = ktime_get();
1012         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1013         hold_lkb(lkb);
1014         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1015  out:
1016         if (error)
1017                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1018                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1019                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1020         mutex_unlock(&ls->ls_waiters_mutex);
1021         return error;
1022 }
1023 
1024 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1025    list as part of process_requestqueue (e.g. a lookup that has an optimized
1026    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1027    set RESEND and dlm_recover_waiters_post() */
1028 
1029 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1030                                 struct dlm_message *ms)
1031 {
1032         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1033         int overlap_done = 0;
1034 
1035         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1036                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1037                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1038                 overlap_done = 1;
1039                 goto out_del;
1040         }
1041 
1042         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1043                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1044                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1045                 overlap_done = 1;
1046                 goto out_del;
1047         }
1048 
1049         /* Cancel state was preemptively cleared by a successful convert,
1050            see next comment, nothing to do. */
1051 
1052         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1053             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1054                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1055                           lkb->lkb_id, lkb->lkb_wait_type);
1056                 return -1;
1057         }
1058 
1059         /* Remove for the convert reply, and premptively remove for the
1060            cancel reply.  A convert has been granted while there's still
1061            an outstanding cancel on it (the cancel is moot and the result
1062            in the cancel reply should be 0).  We preempt the cancel reply
1063            because the app gets the convert result and then can follow up
1064            with another op, like convert.  This subsequent op would see the
1065            lingering state of the cancel and fail with -EBUSY. */
1066 
1067         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1068             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1069             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1070                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1071                           lkb->lkb_id);
1072                 lkb->lkb_wait_type = 0;
1073                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1074                 lkb->lkb_wait_count--;
1075                 goto out_del;
1076         }
1077 
1078         /* N.B. type of reply may not always correspond to type of original
1079            msg due to lookup->request optimization, verify others? */
1080 
1081         if (lkb->lkb_wait_type) {
1082                 lkb->lkb_wait_type = 0;
1083                 goto out_del;
1084         }
1085 
1086         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1087                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1088                   mstype, lkb->lkb_flags);
1089         return -1;
1090 
1091  out_del:
1092         /* the force-unlock/cancel has completed and we haven't recvd a reply
1093            to the op that was in progress prior to the unlock/cancel; we
1094            give up on any reply to the earlier op.  FIXME: not sure when/how
1095            this would happen */
1096 
1097         if (overlap_done && lkb->lkb_wait_type) {
1098                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1099                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1100                 lkb->lkb_wait_count--;
1101                 lkb->lkb_wait_type = 0;
1102         }
1103 
1104         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1105 
1106         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1107         lkb->lkb_wait_count--;
1108         if (!lkb->lkb_wait_count)
1109                 list_del_init(&lkb->lkb_wait_reply);
1110         unhold_lkb(lkb);
1111         return 0;
1112 }
1113 
1114 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1115 {
1116         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1117         int error;
1118 
1119         mutex_lock(&ls->ls_waiters_mutex);
1120         error = _remove_from_waiters(lkb, mstype, NULL);
1121         mutex_unlock(&ls->ls_waiters_mutex);
1122         return error;
1123 }
1124 
1125 /* Handles situations where we might be processing a "fake" or "stub" reply in
1126    which we can't try to take waiters_mutex again. */
1127 
1128 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1129 {
1130         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1131         int error;
1132 
1133         if (ms->m_flags != DLM_IFL_STUB_MS)
1134                 mutex_lock(&ls->ls_waiters_mutex);
1135         error = _remove_from_waiters(lkb, ms->m_type, ms);
1136         if (ms->m_flags != DLM_IFL_STUB_MS)
1137                 mutex_unlock(&ls->ls_waiters_mutex);
1138         return error;
1139 }
1140 
1141 static void dir_remove(struct dlm_rsb *r)
1142 {
1143         int to_nodeid;
1144 
1145         if (dlm_no_directory(r->res_ls))
1146                 return;
1147 
1148         to_nodeid = dlm_dir_nodeid(r);
1149         if (to_nodeid != dlm_our_nodeid())
1150                 send_remove(r);
1151         else
1152                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1153                                      r->res_name, r->res_length);
1154 }
1155 
1156 /* FIXME: make this more efficient */
1157 
1158 static int shrink_bucket(struct dlm_ls *ls, int b)
1159 {
1160         struct rb_node *n;
1161         struct dlm_rsb *r;
1162         int count = 0, found;
1163 
1164         for (;;) {
1165                 found = 0;
1166                 spin_lock(&ls->ls_rsbtbl[b].lock);
1167                 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1168                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1169                         if (!time_after_eq(jiffies, r->res_toss_time +
1170                                            dlm_config.ci_toss_secs * HZ))
1171                                 continue;
1172                         found = 1;
1173                         break;
1174                 }
1175 
1176                 if (!found) {
1177                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1178                         break;
1179                 }
1180 
1181                 if (kref_put(&r->res_ref, kill_rsb)) {
1182                         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1183                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1184 
1185                         if (is_master(r))
1186                                 dir_remove(r);
1187                         dlm_free_rsb(r);
1188                         count++;
1189                 } else {
1190                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1191                         log_error(ls, "tossed rsb in use %s", r->res_name);
1192                 }
1193         }
1194 
1195         return count;
1196 }
1197 
1198 void dlm_scan_rsbs(struct dlm_ls *ls)
1199 {
1200         int i;
1201 
1202         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1203                 shrink_bucket(ls, i);
1204                 if (dlm_locking_stopped(ls))
1205                         break;
1206                 cond_resched();
1207         }
1208 }
1209 
1210 static void add_timeout(struct dlm_lkb *lkb)
1211 {
1212         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1213 
1214         if (is_master_copy(lkb))
1215                 return;
1216 
1217         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1218             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1219                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1220                 goto add_it;
1221         }
1222         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1223                 goto add_it;
1224         return;
1225 
1226  add_it:
1227         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1228         mutex_lock(&ls->ls_timeout_mutex);
1229         hold_lkb(lkb);
1230         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1231         mutex_unlock(&ls->ls_timeout_mutex);
1232 }
1233 
1234 static void del_timeout(struct dlm_lkb *lkb)
1235 {
1236         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1237 
1238         mutex_lock(&ls->ls_timeout_mutex);
1239         if (!list_empty(&lkb->lkb_time_list)) {
1240                 list_del_init(&lkb->lkb_time_list);
1241                 unhold_lkb(lkb);
1242         }
1243         mutex_unlock(&ls->ls_timeout_mutex);
1244 }
1245 
1246 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1247    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1248    and then lock rsb because of lock ordering in add_timeout.  We may need
1249    to specify some special timeout-related bits in the lkb that are just to
1250    be accessed under the timeout_mutex. */
1251 
1252 void dlm_scan_timeout(struct dlm_ls *ls)
1253 {
1254         struct dlm_rsb *r;
1255         struct dlm_lkb *lkb;
1256         int do_cancel, do_warn;
1257         s64 wait_us;
1258 
1259         for (;;) {
1260                 if (dlm_locking_stopped(ls))
1261                         break;
1262 
1263                 do_cancel = 0;
1264                 do_warn = 0;
1265                 mutex_lock(&ls->ls_timeout_mutex);
1266                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1267 
1268                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1269                                                         lkb->lkb_timestamp));
1270 
1271                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1272                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1273                                 do_cancel = 1;
1274 
1275                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1276                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1277                                 do_warn = 1;
1278 
1279                         if (!do_cancel && !do_warn)
1280                                 continue;
1281                         hold_lkb(lkb);
1282                         break;
1283                 }
1284                 mutex_unlock(&ls->ls_timeout_mutex);
1285 
1286                 if (!do_cancel && !do_warn)
1287                         break;
1288 
1289                 r = lkb->lkb_resource;
1290                 hold_rsb(r);
1291                 lock_rsb(r);
1292 
1293                 if (do_warn) {
1294                         /* clear flag so we only warn once */
1295                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1296                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1297                                 del_timeout(lkb);
1298                         dlm_timeout_warn(lkb);
1299                 }
1300 
1301                 if (do_cancel) {
1302                         log_debug(ls, "timeout cancel %x node %d %s",
1303                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1304                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1305                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1306                         del_timeout(lkb);
1307                         _cancel_lock(r, lkb);
1308                 }
1309 
1310                 unlock_rsb(r);
1311                 unhold_rsb(r);
1312                 dlm_put_lkb(lkb);
1313         }
1314 }
1315 
1316 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1317    dlm_recoverd before checking/setting ls_recover_begin. */
1318 
1319 void dlm_adjust_timeouts(struct dlm_ls *ls)
1320 {
1321         struct dlm_lkb *lkb;
1322         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1323 
1324         ls->ls_recover_begin = 0;
1325         mutex_lock(&ls->ls_timeout_mutex);
1326         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1327                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1328         mutex_unlock(&ls->ls_timeout_mutex);
1329 
1330         if (!dlm_config.ci_waitwarn_us)
1331                 return;
1332 
1333         mutex_lock(&ls->ls_waiters_mutex);
1334         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1335                 if (ktime_to_us(lkb->lkb_wait_time))
1336                         lkb->lkb_wait_time = ktime_get();
1337         }
1338         mutex_unlock(&ls->ls_waiters_mutex);
1339 }
1340 
1341 /* lkb is master or local copy */
1342 
1343 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1344 {
1345         int b, len = r->res_ls->ls_lvblen;
1346 
1347         /* b=1 lvb returned to caller
1348            b=0 lvb written to rsb or invalidated
1349            b=-1 do nothing */
1350 
1351         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1352 
1353         if (b == 1) {
1354                 if (!lkb->lkb_lvbptr)
1355                         return;
1356 
1357                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1358                         return;
1359 
1360                 if (!r->res_lvbptr)
1361                         return;
1362 
1363                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1364                 lkb->lkb_lvbseq = r->res_lvbseq;
1365 
1366         } else if (b == 0) {
1367                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1368                         rsb_set_flag(r, RSB_VALNOTVALID);
1369                         return;
1370                 }
1371 
1372                 if (!lkb->lkb_lvbptr)
1373                         return;
1374 
1375                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1376                         return;
1377 
1378                 if (!r->res_lvbptr)
1379                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1380 
1381                 if (!r->res_lvbptr)
1382                         return;
1383 
1384                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1385                 r->res_lvbseq++;
1386                 lkb->lkb_lvbseq = r->res_lvbseq;
1387                 rsb_clear_flag(r, RSB_VALNOTVALID);
1388         }
1389 
1390         if (rsb_flag(r, RSB_VALNOTVALID))
1391                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1392 }
1393 
1394 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1395 {
1396         if (lkb->lkb_grmode < DLM_LOCK_PW)
1397                 return;
1398 
1399         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1400                 rsb_set_flag(r, RSB_VALNOTVALID);
1401                 return;
1402         }
1403 
1404         if (!lkb->lkb_lvbptr)
1405                 return;
1406 
1407         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1408                 return;
1409 
1410         if (!r->res_lvbptr)
1411                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1412 
1413         if (!r->res_lvbptr)
1414                 return;
1415 
1416         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1417         r->res_lvbseq++;
1418         rsb_clear_flag(r, RSB_VALNOTVALID);
1419 }
1420 
1421 /* lkb is process copy (pc) */
1422 
1423 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1424                             struct dlm_message *ms)
1425 {
1426         int b;
1427 
1428         if (!lkb->lkb_lvbptr)
1429                 return;
1430 
1431         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1432                 return;
1433 
1434         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1435         if (b == 1) {
1436                 int len = receive_extralen(ms);
1437                 if (len > DLM_RESNAME_MAXLEN)
1438                         len = DLM_RESNAME_MAXLEN;
1439                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1440                 lkb->lkb_lvbseq = ms->m_lvbseq;
1441         }
1442 }
1443 
1444 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1445    remove_lock -- used for unlock, removes lkb from granted
1446    revert_lock -- used for cancel, moves lkb from convert to granted
1447    grant_lock  -- used for request and convert, adds lkb to granted or
1448                   moves lkb from convert or waiting to granted
1449 
1450    Each of these is used for master or local copy lkb's.  There is
1451    also a _pc() variation used to make the corresponding change on
1452    a process copy (pc) lkb. */
1453 
1454 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1455 {
1456         del_lkb(r, lkb);
1457         lkb->lkb_grmode = DLM_LOCK_IV;
1458         /* this unhold undoes the original ref from create_lkb()
1459            so this leads to the lkb being freed */
1460         unhold_lkb(lkb);
1461 }
1462 
1463 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1464 {
1465         set_lvb_unlock(r, lkb);
1466         _remove_lock(r, lkb);
1467 }
1468 
1469 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1470 {
1471         _remove_lock(r, lkb);
1472 }
1473 
1474 /* returns: 0 did nothing
1475             1 moved lock to granted
1476            -1 removed lock */
1477 
1478 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1479 {
1480         int rv = 0;
1481 
1482         lkb->lkb_rqmode = DLM_LOCK_IV;
1483 
1484         switch (lkb->lkb_status) {
1485         case DLM_LKSTS_GRANTED:
1486                 break;
1487         case DLM_LKSTS_CONVERT:
1488                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1489                 rv = 1;
1490                 break;
1491         case DLM_LKSTS_WAITING:
1492                 del_lkb(r, lkb);
1493                 lkb->lkb_grmode = DLM_LOCK_IV;
1494                 /* this unhold undoes the original ref from create_lkb()
1495                    so this leads to the lkb being freed */
1496                 unhold_lkb(lkb);
1497                 rv = -1;
1498                 break;
1499         default:
1500                 log_print("invalid status for revert %d", lkb->lkb_status);
1501         }
1502         return rv;
1503 }
1504 
1505 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1506 {
1507         return revert_lock(r, lkb);
1508 }
1509 
1510 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1511 {
1512         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1513                 lkb->lkb_grmode = lkb->lkb_rqmode;
1514                 if (lkb->lkb_status)
1515                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1516                 else
1517                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1518         }
1519 
1520         lkb->lkb_rqmode = DLM_LOCK_IV;
1521         lkb->lkb_highbast = 0;
1522 }
1523 
1524 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1525 {
1526         set_lvb_lock(r, lkb);
1527         _grant_lock(r, lkb);
1528 }
1529 
1530 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1531                           struct dlm_message *ms)
1532 {
1533         set_lvb_lock_pc(r, lkb, ms);
1534         _grant_lock(r, lkb);
1535 }
1536 
1537 /* called by grant_pending_locks() which means an async grant message must
1538    be sent to the requesting node in addition to granting the lock if the
1539    lkb belongs to a remote node. */
1540 
1541 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1542 {
1543         grant_lock(r, lkb);
1544         if (is_master_copy(lkb))
1545                 send_grant(r, lkb);
1546         else
1547                 queue_cast(r, lkb, 0);
1548 }
1549 
1550 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1551    change the granted/requested modes.  We're munging things accordingly in
1552    the process copy.
1553    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1554    conversion deadlock
1555    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1556    compatible with other granted locks */
1557 
1558 static void munge_demoted(struct dlm_lkb *lkb)
1559 {
1560         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1561                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1562                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1563                 return;
1564         }
1565 
1566         lkb->lkb_grmode = DLM_LOCK_NL;
1567 }
1568 
1569 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1570 {
1571         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1572             ms->m_type != DLM_MSG_GRANT) {
1573                 log_print("munge_altmode %x invalid reply type %d",
1574                           lkb->lkb_id, ms->m_type);
1575                 return;
1576         }
1577 
1578         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1579                 lkb->lkb_rqmode = DLM_LOCK_PR;
1580         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1581                 lkb->lkb_rqmode = DLM_LOCK_CW;
1582         else {
1583                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1584                 dlm_print_lkb(lkb);
1585         }
1586 }
1587 
1588 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1589 {
1590         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1591                                            lkb_statequeue);
1592         if (lkb->lkb_id == first->lkb_id)
1593                 return 1;
1594 
1595         return 0;
1596 }
1597 
1598 /* Check if the given lkb conflicts with another lkb on the queue. */
1599 
1600 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1601 {
1602         struct dlm_lkb *this;
1603 
1604         list_for_each_entry(this, head, lkb_statequeue) {
1605                 if (this == lkb)
1606                         continue;
1607                 if (!modes_compat(this, lkb))
1608                         return 1;
1609         }
1610         return 0;
1611 }
1612 
1613 /*
1614  * "A conversion deadlock arises with a pair of lock requests in the converting
1615  * queue for one resource.  The granted mode of each lock blocks the requested
1616  * mode of the other lock."
1617  *
1618  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1619  * convert queue from being granted, then deadlk/demote lkb.
1620  *
1621  * Example:
1622  * Granted Queue: empty
1623  * Convert Queue: NL->EX (first lock)
1624  *                PR->EX (second lock)
1625  *
1626  * The first lock can't be granted because of the granted mode of the second
1627  * lock and the second lock can't be granted because it's not first in the
1628  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1629  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1630  * flag set and return DEMOTED in the lksb flags.
1631  *
1632  * Originally, this function detected conv-deadlk in a more limited scope:
1633  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1634  * - if lkb1 was the first entry in the queue (not just earlier), and was
1635  *   blocked by the granted mode of lkb2, and there was nothing on the
1636  *   granted queue preventing lkb1 from being granted immediately, i.e.
1637  *   lkb2 was the only thing preventing lkb1 from being granted.
1638  *
1639  * That second condition meant we'd only say there was conv-deadlk if
1640  * resolving it (by demotion) would lead to the first lock on the convert
1641  * queue being granted right away.  It allowed conversion deadlocks to exist
1642  * between locks on the convert queue while they couldn't be granted anyway.
1643  *
1644  * Now, we detect and take action on conversion deadlocks immediately when
1645  * they're created, even if they may not be immediately consequential.  If
1646  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1647  * mode that would prevent lkb1's conversion from being granted, we do a
1648  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1649  * I think this means that the lkb_is_ahead condition below should always
1650  * be zero, i.e. there will never be conv-deadlk between two locks that are
1651  * both already on the convert queue.
1652  */
1653 
1654 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1655 {
1656         struct dlm_lkb *lkb1;
1657         int lkb_is_ahead = 0;
1658 
1659         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1660                 if (lkb1 == lkb2) {
1661                         lkb_is_ahead = 1;
1662                         continue;
1663                 }
1664 
1665                 if (!lkb_is_ahead) {
1666                         if (!modes_compat(lkb2, lkb1))
1667                                 return 1;
1668                 } else {
1669                         if (!modes_compat(lkb2, lkb1) &&
1670                             !modes_compat(lkb1, lkb2))
1671                                 return 1;
1672                 }
1673         }
1674         return 0;
1675 }
1676 
1677 /*
1678  * Return 1 if the lock can be granted, 0 otherwise.
1679  * Also detect and resolve conversion deadlocks.
1680  *
1681  * lkb is the lock to be granted
1682  *
1683  * now is 1 if the function is being called in the context of the
1684  * immediate request, it is 0 if called later, after the lock has been
1685  * queued.
1686  *
1687  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1688  */
1689 
1690 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1691 {
1692         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1693 
1694         /*
1695          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1696          * a new request for a NL mode lock being blocked.
1697          *
1698          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1699          * request, then it would be granted.  In essence, the use of this flag
1700          * tells the Lock Manager to expedite theis request by not considering
1701          * what may be in the CONVERTING or WAITING queues...  As of this
1702          * writing, the EXPEDITE flag can be used only with new requests for NL
1703          * mode locks.  This flag is not valid for conversion requests.
1704          *
1705          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1706          * conversion or used with a non-NL requested mode.  We also know an
1707          * EXPEDITE request is always granted immediately, so now must always
1708          * be 1.  The full condition to grant an expedite request: (now &&
1709          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1710          * therefore be shortened to just checking the flag.
1711          */
1712 
1713         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1714                 return 1;
1715 
1716         /*
1717          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1718          * added to the remaining conditions.
1719          */
1720 
1721         if (queue_conflict(&r->res_grantqueue, lkb))
1722                 goto out;
1723 
1724         /*
1725          * 6-3: By default, a conversion request is immediately granted if the
1726          * requested mode is compatible with the modes of all other granted
1727          * locks
1728          */
1729 
1730         if (queue_conflict(&r->res_convertqueue, lkb))
1731                 goto out;
1732 
1733         /*
1734          * 6-5: But the default algorithm for deciding whether to grant or
1735          * queue conversion requests does not by itself guarantee that such
1736          * requests are serviced on a "first come first serve" basis.  This, in
1737          * turn, can lead to a phenomenon known as "indefinate postponement".
1738          *
1739          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1740          * the system service employed to request a lock conversion.  This flag
1741          * forces certain conversion requests to be queued, even if they are
1742          * compatible with the granted modes of other locks on the same
1743          * resource.  Thus, the use of this flag results in conversion requests
1744          * being ordered on a "first come first servce" basis.
1745          *
1746          * DCT: This condition is all about new conversions being able to occur
1747          * "in place" while the lock remains on the granted queue (assuming
1748          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1749          * doesn't _have_ to go onto the convert queue where it's processed in
1750          * order.  The "now" variable is necessary to distinguish converts
1751          * being received and processed for the first time now, because once a
1752          * convert is moved to the conversion queue the condition below applies
1753          * requiring fifo granting.
1754          */
1755 
1756         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1757                 return 1;
1758 
1759         /*
1760          * Even if the convert is compat with all granted locks,
1761          * QUECVT forces it behind other locks on the convert queue.
1762          */
1763 
1764         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
1765                 if (list_empty(&r->res_convertqueue))
1766                         return 1;
1767                 else
1768                         goto out;
1769         }
1770 
1771         /*
1772          * The NOORDER flag is set to avoid the standard vms rules on grant
1773          * order.
1774          */
1775 
1776         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1777                 return 1;
1778 
1779         /*
1780          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1781          * granted until all other conversion requests ahead of it are granted
1782          * and/or canceled.
1783          */
1784 
1785         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1786                 return 1;
1787 
1788         /*
1789          * 6-4: By default, a new request is immediately granted only if all
1790          * three of the following conditions are satisfied when the request is
1791          * issued:
1792          * - The queue of ungranted conversion requests for the resource is
1793          *   empty.
1794          * - The queue of ungranted new requests for the resource is empty.
1795          * - The mode of the new request is compatible with the most
1796          *   restrictive mode of all granted locks on the resource.
1797          */
1798 
1799         if (now && !conv && list_empty(&r->res_convertqueue) &&
1800             list_empty(&r->res_waitqueue))
1801                 return 1;
1802 
1803         /*
1804          * 6-4: Once a lock request is in the queue of ungranted new requests,
1805          * it cannot be granted until the queue of ungranted conversion
1806          * requests is empty, all ungranted new requests ahead of it are
1807          * granted and/or canceled, and it is compatible with the granted mode
1808          * of the most restrictive lock granted on the resource.
1809          */
1810 
1811         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1812             first_in_list(lkb, &r->res_waitqueue))
1813                 return 1;
1814  out:
1815         return 0;
1816 }
1817 
1818 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1819                           int *err)
1820 {
1821         int rv;
1822         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1823         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1824 
1825         if (err)
1826                 *err = 0;
1827 
1828         rv = _can_be_granted(r, lkb, now);
1829         if (rv)
1830                 goto out;
1831 
1832         /*
1833          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1834          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1835          * cancels one of the locks.
1836          */
1837 
1838         if (is_convert && can_be_queued(lkb) &&
1839             conversion_deadlock_detect(r, lkb)) {
1840                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1841                         lkb->lkb_grmode = DLM_LOCK_NL;
1842                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1843                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1844                         if (err)
1845                                 *err = -EDEADLK;
1846                         else {
1847                                 log_print("can_be_granted deadlock %x now %d",
1848                                           lkb->lkb_id, now);
1849                                 dlm_dump_rsb(r);
1850                         }
1851                 }
1852                 goto out;
1853         }
1854 
1855         /*
1856          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1857          * to grant a request in a mode other than the normal rqmode.  It's a
1858          * simple way to provide a big optimization to applications that can
1859          * use them.
1860          */
1861 
1862         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1863                 alt = DLM_LOCK_PR;
1864         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1865                 alt = DLM_LOCK_CW;
1866 
1867         if (alt) {
1868                 lkb->lkb_rqmode = alt;
1869                 rv = _can_be_granted(r, lkb, now);
1870                 if (rv)
1871                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1872                 else
1873                         lkb->lkb_rqmode = rqmode;
1874         }
1875  out:
1876         return rv;
1877 }
1878 
1879 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1880    for locks pending on the convert list.  Once verified (watch for these
1881    log_prints), we should be able to just call _can_be_granted() and not
1882    bother with the demote/deadlk cases here (and there's no easy way to deal
1883    with a deadlk here, we'd have to generate something like grant_lock with
1884    the deadlk error.) */
1885 
1886 /* Returns the highest requested mode of all blocked conversions; sets
1887    cw if there's a blocked conversion to DLM_LOCK_CW. */
1888 
1889 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890                                  unsigned int *count)
1891 {
1892         struct dlm_lkb *lkb, *s;
1893         int hi, demoted, quit, grant_restart, demote_restart;
1894         int deadlk;
1895 
1896         quit = 0;
1897  restart:
1898         grant_restart = 0;
1899         demote_restart = 0;
1900         hi = DLM_LOCK_IV;
1901 
1902         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1903                 demoted = is_demoted(lkb);
1904                 deadlk = 0;
1905 
1906                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1907                         grant_lock_pending(r, lkb);
1908                         grant_restart = 1;
1909                         if (count)
1910                                 (*count)++;
1911                         continue;
1912                 }
1913 
1914                 if (!demoted && is_demoted(lkb)) {
1915                         log_print("WARN: pending demoted %x node %d %s",
1916                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1917                         demote_restart = 1;
1918                         continue;
1919                 }
1920 
1921                 if (deadlk) {
1922                         log_print("WARN: pending deadlock %x node %d %s",
1923                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1924                         dlm_dump_rsb(r);
1925                         continue;
1926                 }
1927 
1928                 hi = max_t(int, lkb->lkb_rqmode, hi);
1929 
1930                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1931                         *cw = 1;
1932         }
1933 
1934         if (grant_restart)
1935                 goto restart;
1936         if (demote_restart && !quit) {
1937                 quit = 1;
1938                 goto restart;
1939         }
1940 
1941         return max_t(int, high, hi);
1942 }
1943 
1944 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1945                               unsigned int *count)
1946 {
1947         struct dlm_lkb *lkb, *s;
1948 
1949         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1950                 if (can_be_granted(r, lkb, 0, NULL)) {
1951                         grant_lock_pending(r, lkb);
1952                         if (count)
1953                                 (*count)++;
1954                 } else {
1955                         high = max_t(int, lkb->lkb_rqmode, high);
1956                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1957                                 *cw = 1;
1958                 }
1959         }
1960 
1961         return high;
1962 }
1963 
1964 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1965    on either the convert or waiting queue.
1966    high is the largest rqmode of all locks blocked on the convert or
1967    waiting queue. */
1968 
1969 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1970 {
1971         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1972                 if (gr->lkb_highbast < DLM_LOCK_EX)
1973                         return 1;
1974                 return 0;
1975         }
1976 
1977         if (gr->lkb_highbast < high &&
1978             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1979                 return 1;
1980         return 0;
1981 }
1982 
1983 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
1984 {
1985         struct dlm_lkb *lkb, *s;
1986         int high = DLM_LOCK_IV;
1987         int cw = 0;
1988 
1989         if (!is_master(r)) {
1990                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
1991                 dlm_dump_rsb(r);
1992                 return;
1993         }
1994 
1995         high = grant_pending_convert(r, high, &cw, count);
1996         high = grant_pending_wait(r, high, &cw, count);
1997 
1998         if (high == DLM_LOCK_IV)
1999                 return;
2000 
2001         /*
2002          * If there are locks left on the wait/convert queue then send blocking
2003          * ASTs to granted locks based on the largest requested mode (high)
2004          * found above.
2005          */
2006 
2007         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2008                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2009                         if (cw && high == DLM_LOCK_PR &&
2010                             lkb->lkb_grmode == DLM_LOCK_PR)
2011                                 queue_bast(r, lkb, DLM_LOCK_CW);
2012                         else
2013                                 queue_bast(r, lkb, high);
2014                         lkb->lkb_highbast = high;
2015                 }
2016         }
2017 }
2018 
2019 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2020 {
2021         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2022             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2023                 if (gr->lkb_highbast < DLM_LOCK_EX)
2024                         return 1;
2025                 return 0;
2026         }
2027 
2028         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2029                 return 1;
2030         return 0;
2031 }
2032 
2033 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2034                             struct dlm_lkb *lkb)
2035 {
2036         struct dlm_lkb *gr;
2037 
2038         list_for_each_entry(gr, head, lkb_statequeue) {
2039                 /* skip self when sending basts to convertqueue */
2040                 if (gr == lkb)
2041                         continue;
2042                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2043                         queue_bast(r, gr, lkb->lkb_rqmode);
2044                         gr->lkb_highbast = lkb->lkb_rqmode;
2045                 }
2046         }
2047 }
2048 
2049 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051         send_bast_queue(r, &r->res_grantqueue, lkb);
2052 }
2053 
2054 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2055 {
2056         send_bast_queue(r, &r->res_grantqueue, lkb);
2057         send_bast_queue(r, &r->res_convertqueue, lkb);
2058 }
2059 
2060 /* set_master(r, lkb) -- set the master nodeid of a resource
2061 
2062    The purpose of this function is to set the nodeid field in the given
2063    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2064    known, it can just be copied to the lkb and the function will return
2065    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2066    before it can be copied to the lkb.
2067 
2068    When the rsb nodeid is being looked up remotely, the initial lkb
2069    causing the lookup is kept on the ls_waiters list waiting for the
2070    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2071    on the rsb's res_lookup list until the master is verified.
2072 
2073    Return values:
2074    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2075    1: the rsb master is not available and the lkb has been placed on
2076       a wait queue
2077 */
2078 
2079 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2080 {
2081         struct dlm_ls *ls = r->res_ls;
2082         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2083 
2084         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2085                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2086                 r->res_first_lkid = lkb->lkb_id;
2087                 lkb->lkb_nodeid = r->res_nodeid;
2088                 return 0;
2089         }
2090 
2091         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2092                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2093                 return 1;
2094         }
2095 
2096         if (r->res_nodeid == 0) {
2097                 lkb->lkb_nodeid = 0;
2098                 return 0;
2099         }
2100 
2101         if (r->res_nodeid > 0) {
2102                 lkb->lkb_nodeid = r->res_nodeid;
2103                 return 0;
2104         }
2105 
2106         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2107 
2108         dir_nodeid = dlm_dir_nodeid(r);
2109 
2110         if (dir_nodeid != our_nodeid) {
2111                 r->res_first_lkid = lkb->lkb_id;
2112                 send_lookup(r, lkb);
2113                 return 1;
2114         }
2115 
2116         for (i = 0; i < 2; i++) {
2117                 /* It's possible for dlm_scand to remove an old rsb for
2118                    this same resource from the toss list, us to create
2119                    a new one, look up the master locally, and find it
2120                    already exists just before dlm_scand does the
2121                    dir_remove() on the previous rsb. */
2122 
2123                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2124                                        r->res_length, &ret_nodeid);
2125                 if (!error)
2126                         break;
2127                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2128                 schedule();
2129         }
2130         if (error && error != -EEXIST)
2131                 return error;
2132 
2133         if (ret_nodeid == our_nodeid) {
2134                 r->res_first_lkid = 0;
2135                 r->res_nodeid = 0;
2136                 lkb->lkb_nodeid = 0;
2137         } else {
2138                 r->res_first_lkid = lkb->lkb_id;
2139                 r->res_nodeid = ret_nodeid;
2140                 lkb->lkb_nodeid = ret_nodeid;
2141         }
2142         return 0;
2143 }
2144 
2145 static void process_lookup_list(struct dlm_rsb *r)
2146 {
2147         struct dlm_lkb *lkb, *safe;
2148 
2149         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2150                 list_del_init(&lkb->lkb_rsb_lookup);
2151                 _request_lock(r, lkb);
2152                 schedule();
2153         }
2154 }
2155 
2156 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2157 
2158 static void confirm_master(struct dlm_rsb *r, int error)
2159 {
2160         struct dlm_lkb *lkb;
2161 
2162         if (!r->res_first_lkid)
2163                 return;
2164 
2165         switch (error) {
2166         case 0:
2167         case -EINPROGRESS:
2168                 r->res_first_lkid = 0;
2169                 process_lookup_list(r);
2170                 break;
2171 
2172         case -EAGAIN:
2173         case -EBADR:
2174         case -ENOTBLK:
2175                 /* the remote request failed and won't be retried (it was
2176                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2177                    lkb the first_lkid */
2178 
2179                 r->res_first_lkid = 0;
2180 
2181                 if (!list_empty(&r->res_lookup)) {
2182                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2183                                          lkb_rsb_lookup);
2184                         list_del_init(&lkb->lkb_rsb_lookup);
2185                         r->res_first_lkid = lkb->lkb_id;
2186                         _request_lock(r, lkb);
2187                 }
2188                 break;
2189 
2190         default:
2191                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2192         }
2193 }
2194 
2195 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2196                          int namelen, unsigned long timeout_cs,
2197                          void (*ast) (void *astparam),
2198                          void *astparam,
2199                          void (*bast) (void *astparam, int mode),
2200                          struct dlm_args *args)
2201 {
2202         int rv = -EINVAL;
2203 
2204         /* check for invalid arg usage */
2205 
2206         if (mode < 0 || mode > DLM_LOCK_EX)
2207                 goto out;
2208 
2209         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2210                 goto out;
2211 
2212         if (flags & DLM_LKF_CANCEL)
2213                 goto out;
2214 
2215         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2216                 goto out;
2217 
2218         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2219                 goto out;
2220 
2221         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2222                 goto out;
2223 
2224         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2225                 goto out;
2226 
2227         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2228                 goto out;
2229 
2230         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2231                 goto out;
2232 
2233         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2234                 goto out;
2235 
2236         if (!ast || !lksb)
2237                 goto out;
2238 
2239         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2240                 goto out;
2241 
2242         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2243                 goto out;
2244 
2245         /* these args will be copied to the lkb in validate_lock_args,
2246            it cannot be done now because when converting locks, fields in
2247            an active lkb cannot be modified before locking the rsb */
2248 
2249         args->flags = flags;
2250         args->astfn = ast;
2251         args->astparam = astparam;
2252         args->bastfn = bast;
2253         args->timeout = timeout_cs;
2254         args->mode = mode;
2255         args->lksb = lksb;
2256         rv = 0;
2257  out:
2258         return rv;
2259 }
2260 
2261 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2262 {
2263         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2264                       DLM_LKF_FORCEUNLOCK))
2265                 return -EINVAL;
2266 
2267         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2268                 return -EINVAL;
2269 
2270         args->flags = flags;
2271         args->astparam = astarg;
2272         return 0;
2273 }
2274 
2275 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2276                               struct dlm_args *args)
2277 {
2278         int rv = -EINVAL;
2279 
2280         if (args->flags & DLM_LKF_CONVERT) {
2281                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2282                         goto out;
2283 
2284                 if (args->flags & DLM_LKF_QUECVT &&
2285                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2286                         goto out;
2287 
2288                 rv = -EBUSY;
2289                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2290                         goto out;
2291 
2292                 if (lkb->lkb_wait_type)
2293                         goto out;
2294 
2295                 if (is_overlap(lkb))
2296                         goto out;
2297         }
2298 
2299         lkb->lkb_exflags = args->flags;
2300         lkb->lkb_sbflags = 0;
2301         lkb->lkb_astfn = args->astfn;
2302         lkb->lkb_astparam = args->astparam;
2303         lkb->lkb_bastfn = args->bastfn;
2304         lkb->lkb_rqmode = args->mode;
2305         lkb->lkb_lksb = args->lksb;
2306         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2307         lkb->lkb_ownpid = (int) current->pid;
2308         lkb->lkb_timeout_cs = args->timeout;
2309         rv = 0;
2310  out:
2311         if (rv)
2312                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2313                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2314                           lkb->lkb_status, lkb->lkb_wait_type,
2315                           lkb->lkb_resource->res_name);
2316         return rv;
2317 }
2318 
2319 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2320    for success */
2321 
2322 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2323    because there may be a lookup in progress and it's valid to do
2324    cancel/unlockf on it */
2325 
2326 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2327 {
2328         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2329         int rv = -EINVAL;
2330 
2331         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2332                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2333                 dlm_print_lkb(lkb);
2334                 goto out;
2335         }
2336 
2337         /* an lkb may still exist even though the lock is EOL'ed due to a
2338            cancel, unlock or failed noqueue request; an app can't use these
2339            locks; return same error as if the lkid had not been found at all */
2340 
2341         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2342                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2343                 rv = -ENOENT;
2344                 goto out;
2345         }
2346 
2347         /* an lkb may be waiting for an rsb lookup to complete where the
2348            lookup was initiated by another lock */
2349 
2350         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2351                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2352                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2353                         list_del_init(&lkb->lkb_rsb_lookup);
2354                         queue_cast(lkb->lkb_resource, lkb,
2355                                    args->flags & DLM_LKF_CANCEL ?
2356                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2357                         unhold_lkb(lkb); /* undoes create_lkb() */
2358                 }
2359                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2360                 rv = -EBUSY;
2361                 goto out;
2362         }
2363 
2364         /* cancel not allowed with another cancel/unlock in progress */
2365 
2366         if (args->flags & DLM_LKF_CANCEL) {
2367                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2368                         goto out;
2369 
2370                 if (is_overlap(lkb))
2371                         goto out;
2372 
2373                 /* don't let scand try to do a cancel */
2374                 del_timeout(lkb);
2375 
2376                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2377                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2378                         rv = -EBUSY;
2379                         goto out;
2380                 }
2381 
2382                 /* there's nothing to cancel */
2383                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2384                     !lkb->lkb_wait_type) {
2385                         rv = -EBUSY;
2386                         goto out;
2387                 }
2388 
2389                 switch (lkb->lkb_wait_type) {
2390                 case DLM_MSG_LOOKUP:
2391                 case DLM_MSG_REQUEST:
2392                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2393                         rv = -EBUSY;
2394                         goto out;
2395                 case DLM_MSG_UNLOCK:
2396                 case DLM_MSG_CANCEL:
2397                         goto out;
2398                 }
2399                 /* add_to_waiters() will set OVERLAP_CANCEL */
2400                 goto out_ok;
2401         }
2402 
2403         /* do we need to allow a force-unlock if there's a normal unlock
2404            already in progress?  in what conditions could the normal unlock
2405            fail such that we'd want to send a force-unlock to be sure? */
2406 
2407         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2408                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2409                         goto out;
2410 
2411                 if (is_overlap_unlock(lkb))
2412                         goto out;
2413 
2414                 /* don't let scand try to do a cancel */
2415                 del_timeout(lkb);
2416 
2417                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2418                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2419                         rv = -EBUSY;
2420                         goto out;
2421                 }
2422 
2423                 switch (lkb->lkb_wait_type) {
2424                 case DLM_MSG_LOOKUP:
2425                 case DLM_MSG_REQUEST:
2426                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2427                         rv = -EBUSY;
2428                         goto out;
2429                 case DLM_MSG_UNLOCK:
2430                         goto out;
2431                 }
2432                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2433                 goto out_ok;
2434         }
2435 
2436         /* normal unlock not allowed if there's any op in progress */
2437         rv = -EBUSY;
2438         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2439                 goto out;
2440 
2441  out_ok:
2442         /* an overlapping op shouldn't blow away exflags from other op */
2443         lkb->lkb_exflags |= args->flags;
2444         lkb->lkb_sbflags = 0;
2445         lkb->lkb_astparam = args->astparam;
2446         rv = 0;
2447  out:
2448         if (rv)
2449                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2450                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2451                           args->flags, lkb->lkb_wait_type,
2452                           lkb->lkb_resource->res_name);
2453         return rv;
2454 }
2455 
2456 /*
2457  * Four stage 4 varieties:
2458  * do_request(), do_convert(), do_unlock(), do_cancel()
2459  * These are called on the master node for the given lock and
2460  * from the central locking logic.
2461  */
2462 
2463 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2464 {
2465         int error = 0;
2466 
2467         if (can_be_granted(r, lkb, 1, NULL)) {
2468                 grant_lock(r, lkb);
2469                 queue_cast(r, lkb, 0);
2470                 goto out;
2471         }
2472 
2473         if (can_be_queued(lkb)) {
2474                 error = -EINPROGRESS;
2475                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2476                 add_timeout(lkb);
2477                 goto out;
2478         }
2479 
2480         error = -EAGAIN;
2481         queue_cast(r, lkb, -EAGAIN);
2482  out:
2483         return error;
2484 }
2485 
2486 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2487                                int error)
2488 {
2489         switch (error) {
2490         case -EAGAIN:
2491                 if (force_blocking_asts(lkb))
2492                         send_blocking_asts_all(r, lkb);
2493                 break;
2494         case -EINPROGRESS:
2495                 send_blocking_asts(r, lkb);
2496                 break;
2497         }
2498 }
2499 
2500 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2501 {
2502         int error = 0;
2503         int deadlk = 0;
2504 
2505         /* changing an existing lock may allow others to be granted */
2506 
2507         if (can_be_granted(r, lkb, 1, &deadlk)) {
2508                 grant_lock(r, lkb);
2509                 queue_cast(r, lkb, 0);
2510                 goto out;
2511         }
2512 
2513         /* can_be_granted() detected that this lock would block in a conversion
2514            deadlock, so we leave it on the granted queue and return EDEADLK in
2515            the ast for the convert. */
2516 
2517         if (deadlk) {
2518                 /* it's left on the granted queue */
2519                 revert_lock(r, lkb);
2520                 queue_cast(r, lkb, -EDEADLK);
2521                 error = -EDEADLK;
2522                 goto out;
2523         }
2524 
2525         /* is_demoted() means the can_be_granted() above set the grmode
2526            to NL, and left us on the granted queue.  This auto-demotion
2527            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2528            now grantable.  We have to try to grant other converting locks
2529            before we try again to grant this one. */
2530 
2531         if (is_demoted(lkb)) {
2532                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2533                 if (_can_be_granted(r, lkb, 1)) {
2534                         grant_lock(r, lkb);
2535                         queue_cast(r, lkb, 0);
2536                         goto out;
2537                 }
2538                 /* else fall through and move to convert queue */
2539         }
2540 
2541         if (can_be_queued(lkb)) {
2542                 error = -EINPROGRESS;
2543                 del_lkb(r, lkb);
2544                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2545                 add_timeout(lkb);
2546                 goto out;
2547         }
2548 
2549         error = -EAGAIN;
2550         queue_cast(r, lkb, -EAGAIN);
2551  out:
2552         return error;
2553 }
2554 
2555 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2556                                int error)
2557 {
2558         switch (error) {
2559         case 0:
2560                 grant_pending_locks(r, NULL);
2561                 /* grant_pending_locks also sends basts */
2562                 break;
2563         case -EAGAIN:
2564                 if (force_blocking_asts(lkb))
2565                         send_blocking_asts_all(r, lkb);
2566                 break;
2567         case -EINPROGRESS:
2568                 send_blocking_asts(r, lkb);
2569                 break;
2570         }
2571 }
2572 
2573 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2574 {
2575         remove_lock(r, lkb);
2576         queue_cast(r, lkb, -DLM_EUNLOCK);
2577         return -DLM_EUNLOCK;
2578 }
2579 
2580 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2581                               int error)
2582 {
2583         grant_pending_locks(r, NULL);
2584 }
2585 
2586 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2587  
2588 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589 {
2590         int error;
2591 
2592         error = revert_lock(r, lkb);
2593         if (error) {
2594                 queue_cast(r, lkb, -DLM_ECANCEL);
2595                 return -DLM_ECANCEL;
2596         }
2597         return 0;
2598 }
2599 
2600 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2601                               int error)
2602 {
2603         if (error)
2604                 grant_pending_locks(r, NULL);
2605 }
2606 
2607 /*
2608  * Four stage 3 varieties:
2609  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2610  */
2611 
2612 /* add a new lkb to a possibly new rsb, called by requesting process */
2613 
2614 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2615 {
2616         int error;
2617 
2618         /* set_master: sets lkb nodeid from r */
2619 
2620         error = set_master(r, lkb);
2621         if (error < 0)
2622                 goto out;
2623         if (error) {
2624                 error = 0;
2625                 goto out;
2626         }
2627 
2628         if (is_remote(r)) {
2629                 /* receive_request() calls do_request() on remote node */
2630                 error = send_request(r, lkb);
2631         } else {
2632                 error = do_request(r, lkb);
2633                 /* for remote locks the request_reply is sent
2634                    between do_request and do_request_effects */
2635                 do_request_effects(r, lkb, error);
2636         }
2637  out:
2638         return error;
2639 }
2640 
2641 /* change some property of an existing lkb, e.g. mode */
2642 
2643 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2644 {
2645         int error;
2646 
2647         if (is_remote(r)) {
2648                 /* receive_convert() calls do_convert() on remote node */
2649                 error = send_convert(r, lkb);
2650         } else {
2651                 error = do_convert(r, lkb);
2652                 /* for remote locks the convert_reply is sent
2653                    between do_convert and do_convert_effects */
2654                 do_convert_effects(r, lkb, error);
2655         }
2656 
2657         return error;
2658 }
2659 
2660 /* remove an existing lkb from the granted queue */
2661 
2662 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2663 {
2664         int error;
2665 
2666         if (is_remote(r)) {
2667                 /* receive_unlock() calls do_unlock() on remote node */
2668                 error = send_unlock(r, lkb);
2669         } else {
2670                 error = do_unlock(r, lkb);
2671                 /* for remote locks the unlock_reply is sent
2672                    between do_unlock and do_unlock_effects */
2673                 do_unlock_effects(r, lkb, error);
2674         }
2675 
2676         return error;
2677 }
2678 
2679 /* remove an existing lkb from the convert or wait queue */
2680 
2681 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2682 {
2683         int error;
2684 
2685         if (is_remote(r)) {
2686                 /* receive_cancel() calls do_cancel() on remote node */
2687                 error = send_cancel(r, lkb);
2688         } else {
2689                 error = do_cancel(r, lkb);
2690                 /* for remote locks the cancel_reply is sent
2691                    between do_cancel and do_cancel_effects */
2692                 do_cancel_effects(r, lkb, error);
2693         }
2694 
2695         return error;
2696 }
2697 
2698 /*
2699  * Four stage 2 varieties:
2700  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2701  */
2702 
2703 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2704                         int len, struct dlm_args *args)
2705 {
2706         struct dlm_rsb *r;
2707         int error;
2708 
2709         error = validate_lock_args(ls, lkb, args);
2710         if (error)
2711                 goto out;
2712 
2713         error = find_rsb(ls, name, len, R_CREATE, &r);
2714         if (error)
2715                 goto out;
2716 
2717         lock_rsb(r);
2718 
2719         attach_lkb(r, lkb);
2720         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2721 
2722         error = _request_lock(r, lkb);
2723 
2724         unlock_rsb(r);
2725         put_rsb(r);
2726 
2727  out:
2728         return error;
2729 }
2730 
2731 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2732                         struct dlm_args *args)
2733 {
2734         struct dlm_rsb *r;
2735         int error;
2736 
2737         r = lkb->lkb_resource;
2738 
2739         hold_rsb(r);
2740         lock_rsb(r);
2741 
2742         error = validate_lock_args(ls, lkb, args);
2743         if (error)
2744                 goto out;
2745 
2746         error = _convert_lock(r, lkb);
2747  out:
2748         unlock_rsb(r);
2749         put_rsb(r);
2750         return error;
2751 }
2752 
2753 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2754                        struct dlm_args *args)
2755 {
2756         struct dlm_rsb *r;
2757         int error;
2758 
2759         r = lkb->lkb_resource;
2760 
2761         hold_rsb(r);
2762         lock_rsb(r);
2763 
2764         error = validate_unlock_args(lkb, args);
2765         if (error)
2766                 goto out;
2767 
2768         error = _unlock_lock(r, lkb);
2769  out:
2770         unlock_rsb(r);
2771         put_rsb(r);
2772         return error;
2773 }
2774 
2775 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2776                        struct dlm_args *args)
2777 {
2778         struct dlm_rsb *r;
2779         int error;
2780 
2781         r = lkb->lkb_resource;
2782 
2783         hold_rsb(r);
2784         lock_rsb(r);
2785 
2786         error = validate_unlock_args(lkb, args);
2787         if (error)
2788                 goto out;
2789 
2790         error = _cancel_lock(r, lkb);
2791  out:
2792         unlock_rsb(r);
2793         put_rsb(r);
2794         return error;
2795 }
2796 
2797 /*
2798  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2799  */
2800 
2801 int dlm_lock(dlm_lockspace_t *lockspace,
2802              int mode,
2803              struct dlm_lksb *lksb,
2804              uint32_t flags,
2805              void *name,
2806              unsigned int namelen,
2807              uint32_t parent_lkid,
2808              void (*ast) (void *astarg),
2809              void *astarg,
2810              void (*bast) (void *astarg, int mode))
2811 {
2812         struct dlm_ls *ls;
2813         struct dlm_lkb *lkb;
2814         struct dlm_args args;
2815         int error, convert = flags & DLM_LKF_CONVERT;
2816 
2817         ls = dlm_find_lockspace_local(lockspace);
2818         if (!ls)
2819                 return -EINVAL;
2820 
2821         dlm_lock_recovery(ls);
2822 
2823         if (convert)
2824                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2825         else
2826                 error = create_lkb(ls, &lkb);
2827 
2828         if (error)
2829                 goto out;
2830 
2831         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2832                               astarg, bast, &args);
2833         if (error)
2834                 goto out_put;
2835 
2836         if (convert)
2837                 error = convert_lock(ls, lkb, &args);
2838         else
2839                 error = request_lock(ls, lkb, name, namelen, &args);
2840 
2841         if (error == -EINPROGRESS)
2842                 error = 0;
2843  out_put:
2844         if (convert || error)
2845                 __put_lkb(ls, lkb);
2846         if (error == -EAGAIN || error == -EDEADLK)
2847                 error = 0;
2848  out:
2849         dlm_unlock_recovery(ls);
2850         dlm_put_lockspace(ls);
2851         return error;
2852 }
2853 
2854 int dlm_unlock(dlm_lockspace_t *lockspace,
2855                uint32_t lkid,
2856                uint32_t flags,
2857                struct dlm_lksb *lksb,
2858                void *astarg)
2859 {
2860         struct dlm_ls *ls;
2861         struct dlm_lkb *lkb;
2862         struct dlm_args args;
2863         int error;
2864 
2865         ls = dlm_find_lockspace_local(lockspace);
2866         if (!ls)
2867                 return -EINVAL;
2868 
2869         dlm_lock_recovery(ls);
2870 
2871         error = find_lkb(ls, lkid, &lkb);
2872         if (error)
2873                 goto out;
2874 
2875         error = set_unlock_args(flags, astarg, &args);
2876         if (error)
2877                 goto out_put;
2878 
2879         if (flags & DLM_LKF_CANCEL)
2880                 error = cancel_lock(ls, lkb, &args);
2881         else
2882                 error = unlock_lock(ls, lkb, &args);
2883 
2884         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2885                 error = 0;
2886         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2887                 error = 0;
2888  out_put:
2889         dlm_put_lkb(lkb);
2890  out:
2891         dlm_unlock_recovery(ls);
2892         dlm_put_lockspace(ls);
2893         return error;
2894 }
2895 
2896 /*
2897  * send/receive routines for remote operations and replies
2898  *
2899  * send_args
2900  * send_common
2901  * send_request                 receive_request
2902  * send_convert                 receive_convert
2903  * send_unlock                  receive_unlock
2904  * send_cancel                  receive_cancel
2905  * send_grant                   receive_grant
2906  * send_bast                    receive_bast
2907  * send_lookup                  receive_lookup
2908  * send_remove                  receive_remove
2909  *
2910  *                              send_common_reply
2911  * receive_request_reply        send_request_reply
2912  * receive_convert_reply        send_convert_reply
2913  * receive_unlock_reply         send_unlock_reply
2914  * receive_cancel_reply         send_cancel_reply
2915  * receive_lookup_reply         send_lookup_reply
2916  */
2917 
2918 static int _create_message(struct dlm_ls *ls, int mb_len,
2919                            int to_nodeid, int mstype,
2920                            struct dlm_message **ms_ret,
2921                            struct dlm_mhandle **mh_ret)
2922 {
2923         struct dlm_message *ms;
2924         struct dlm_mhandle *mh;
2925         char *mb;
2926 
2927         /* get_buffer gives us a message handle (mh) that we need to
2928            pass into lowcomms_commit and a message buffer (mb) that we
2929            write our data into */
2930 
2931         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2932         if (!mh)
2933                 return -ENOBUFS;
2934 
2935         memset(mb, 0, mb_len);
2936 
2937         ms = (struct dlm_message *) mb;
2938 
2939         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2940         ms->m_header.h_lockspace = ls->ls_global_id;
2941         ms->m_header.h_nodeid = dlm_our_nodeid();
2942         ms->m_header.h_length = mb_len;
2943         ms->m_header.h_cmd = DLM_MSG;
2944 
2945         ms->m_type = mstype;
2946 
2947         *mh_ret = mh;
2948         *ms_ret = ms;
2949         return 0;
2950 }
2951 
2952 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2953                           int to_nodeid, int mstype,
2954                           struct dlm_message **ms_ret,
2955                           struct dlm_mhandle **mh_ret)
2956 {
2957         int mb_len = sizeof(struct dlm_message);
2958 
2959         switch (mstype) {
2960         case DLM_MSG_REQUEST:
2961         case DLM_MSG_LOOKUP:
2962         case DLM_MSG_REMOVE:
2963                 mb_len += r->res_length;
2964                 break;
2965         case DLM_MSG_CONVERT:
2966         case DLM_MSG_UNLOCK:
2967         case DLM_MSG_REQUEST_REPLY:
2968         case DLM_MSG_CONVERT_REPLY:
2969         case DLM_MSG_GRANT:
2970                 if (lkb && lkb->lkb_lvbptr)
2971                         mb_len += r->res_ls->ls_lvblen;
2972                 break;
2973         }
2974 
2975         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2976                                ms_ret, mh_ret);
2977 }
2978 
2979 /* further lowcomms enhancements or alternate implementations may make
2980    the return value from this function useful at some point */
2981 
2982 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2983 {
2984         dlm_message_out(ms);
2985         dlm_lowcomms_commit_buffer(mh);
2986         return 0;
2987 }
2988 
2989 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2990                       struct dlm_message *ms)
2991 {
2992         ms->m_nodeid   = lkb->lkb_nodeid;
2993         ms->m_pid      = lkb->lkb_ownpid;
2994         ms->m_lkid     = lkb->lkb_id;
2995         ms->m_remid    = lkb->lkb_remid;
2996         ms->m_exflags  = lkb->lkb_exflags;
2997         ms->m_sbflags  = lkb->lkb_sbflags;
2998         ms->m_flags    = lkb->lkb_flags;
2999         ms->m_lvbseq   = lkb->lkb_lvbseq;
3000         ms->m_status   = lkb->lkb_status;
3001         ms->m_grmode   = lkb->lkb_grmode;
3002         ms->m_rqmode   = lkb->lkb_rqmode;
3003         ms->m_hash     = r->res_hash;
3004 
3005         /* m_result and m_bastmode are set from function args,
3006            not from lkb fields */
3007 
3008         if (lkb->lkb_bastfn)
3009                 ms->m_asts |= DLM_CB_BAST;
3010         if (lkb->lkb_astfn)
3011                 ms->m_asts |= DLM_CB_CAST;
3012 
3013         /* compare with switch in create_message; send_remove() doesn't
3014            use send_args() */
3015 
3016         switch (ms->m_type) {
3017         case DLM_MSG_REQUEST:
3018         case DLM_MSG_LOOKUP:
3019                 memcpy(ms->m_extra, r->res_name, r->res_length);
3020                 break;
3021         case DLM_MSG_CONVERT:
3022         case DLM_MSG_UNLOCK:
3023         case DLM_MSG_REQUEST_REPLY:
3024         case DLM_MSG_CONVERT_REPLY:
3025         case DLM_MSG_GRANT:
3026                 if (!lkb->lkb_lvbptr)
3027                         break;
3028                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3029                 break;
3030         }
3031 }
3032 
3033 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3034 {
3035         struct dlm_message *ms;
3036         struct dlm_mhandle *mh;
3037         int to_nodeid, error;
3038 
3039         to_nodeid = r->res_nodeid;
3040 
3041         error = add_to_waiters(lkb, mstype, to_nodeid);
3042         if (error)
3043                 return error;
3044 
3045         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3046         if (error)
3047                 goto fail;
3048 
3049         send_args(r, lkb, ms);
3050 
3051         error = send_message(mh, ms);
3052         if (error)
3053                 goto fail;
3054         return 0;
3055 
3056  fail:
3057         remove_from_waiters(lkb, msg_reply_type(mstype));
3058         return error;
3059 }
3060 
3061 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3062 {
3063         return send_common(r, lkb, DLM_MSG_REQUEST);
3064 }
3065 
3066 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067 {
3068         int error;
3069 
3070         error = send_common(r, lkb, DLM_MSG_CONVERT);
3071 
3072         /* down conversions go without a reply from the master */
3073         if (!error && down_conversion(lkb)) {
3074                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3075                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3076                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3077                 r->res_ls->ls_stub_ms.m_result = 0;
3078                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3079         }
3080 
3081         return error;
3082 }
3083 
3084 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3085    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3086    that the master is still correct. */
3087 
3088 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3089 {
3090         return send_common(r, lkb, DLM_MSG_UNLOCK);
3091 }
3092 
3093 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3094 {
3095         return send_common(r, lkb, DLM_MSG_CANCEL);
3096 }
3097 
3098 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3099 {
3100         struct dlm_message *ms;
3101         struct dlm_mhandle *mh;
3102         int to_nodeid, error;
3103 
3104         to_nodeid = lkb->lkb_nodeid;
3105 
3106         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3107         if (error)
3108                 goto out;
3109 
3110         send_args(r, lkb, ms);
3111 
3112         ms->m_result = 0;
3113 
3114         error = send_message(mh, ms);
3115  out:
3116         return error;
3117 }
3118 
3119 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3120 {
3121         struct dlm_message *ms;
3122         struct dlm_mhandle *mh;
3123         int to_nodeid, error;
3124 
3125         to_nodeid = lkb->lkb_nodeid;
3126 
3127         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3128         if (error)
3129                 goto out;
3130 
3131         send_args(r, lkb, ms);
3132 
3133         ms->m_bastmode = mode;
3134 
3135         error = send_message(mh, ms);
3136  out:
3137         return error;
3138 }
3139 
3140 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3141 {
3142         struct dlm_message *ms;
3143         struct dlm_mhandle *mh;
3144         int to_nodeid, error;
3145 
3146         to_nodeid = dlm_dir_nodeid(r);
3147 
3148         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3149         if (error)
3150                 return error;
3151 
3152         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3153         if (error)
3154                 goto fail;
3155 
3156         send_args(r, lkb, ms);
3157 
3158         error = send_message(mh, ms);
3159         if (error)
3160                 goto fail;
3161         return 0;
3162 
3163  fail:
3164         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3165         return error;
3166 }
3167 
3168 static int send_remove(struct dlm_rsb *r)
3169 {
3170         struct dlm_message *ms;
3171         struct dlm_mhandle *mh;
3172         int to_nodeid, error;
3173 
3174         to_nodeid = dlm_dir_nodeid(r);
3175 
3176         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3177         if (error)
3178                 goto out;
3179 
3180         memcpy(ms->m_extra, r->res_name, r->res_length);
3181         ms->m_hash = r->res_hash;
3182 
3183         error = send_message(mh, ms);
3184  out:
3185         return error;
3186 }
3187 
3188 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3189                              int mstype, int rv)
3190 {
3191         struct dlm_message *ms;
3192         struct dlm_mhandle *mh;
3193         int to_nodeid, error;
3194 
3195         to_nodeid = lkb->lkb_nodeid;
3196 
3197         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3198         if (error)
3199                 goto out;
3200 
3201         send_args(r, lkb, ms);
3202 
3203         ms->m_result = rv;
3204 
3205         error = send_message(mh, ms);
3206  out:
3207         return error;
3208 }
3209 
3210 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3211 {
3212         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3213 }
3214 
3215 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3216 {
3217         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3218 }
3219 
3220 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3221 {
3222         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3223 }
3224 
3225 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3226 {
3227         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3228 }
3229 
3230 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3231                              int ret_nodeid, int rv)
3232 {
3233         struct dlm_rsb *r = &ls->ls_stub_rsb;
3234         struct dlm_message *ms;
3235         struct dlm_mhandle *mh;
3236         int error, nodeid = ms_in->m_header.h_nodeid;
3237 
3238         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3239         if (error)
3240                 goto out;
3241 
3242         ms->m_lkid = ms_in->m_lkid;
3243         ms->m_result = rv;
3244         ms->m_nodeid = ret_nodeid;
3245 
3246         error = send_message(mh, ms);
3247  out:
3248         return error;
3249 }
3250 
3251 /* which args we save from a received message depends heavily on the type
3252    of message, unlike the send side where we can safely send everything about
3253    the lkb for any type of message */
3254 
3255 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3256 {
3257         lkb->lkb_exflags = ms->m_exflags;
3258         lkb->lkb_sbflags = ms->m_sbflags;
3259         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3260                          (ms->m_flags & 0x0000FFFF);
3261 }
3262 
3263 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3264 {
3265         if (ms->m_flags == DLM_IFL_STUB_MS)
3266                 return;
3267 
3268         lkb->lkb_sbflags = ms->m_sbflags;
3269         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3270                          (ms->m_flags & 0x0000FFFF);
3271 }
3272 
3273 static int receive_extralen(struct dlm_message *ms)
3274 {
3275         return (ms->m_header.h_length - sizeof(struct dlm_message));
3276 }
3277 
3278 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3279                        struct dlm_message *ms)
3280 {
3281         int len;
3282 
3283         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3284                 if (!lkb->lkb_lvbptr)
3285                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3286                 if (!lkb->lkb_lvbptr)
3287                         return -ENOMEM;
3288                 len = receive_extralen(ms);
3289                 if (len > DLM_RESNAME_MAXLEN)
3290                         len = DLM_RESNAME_MAXLEN;
3291                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3292         }
3293         return 0;
3294 }
3295 
3296 static void fake_bastfn(void *astparam, int mode)
3297 {
3298         log_print("fake_bastfn should not be called");
3299 }
3300 
3301 static void fake_astfn(void *astparam)
3302 {
3303         log_print("fake_astfn should not be called");
3304 }
3305 
3306 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3307                                 struct dlm_message *ms)
3308 {
3309         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3310         lkb->lkb_ownpid = ms->m_pid;
3311         lkb->lkb_remid = ms->m_lkid;
3312         lkb->lkb_grmode = DLM_LOCK_IV;
3313         lkb->lkb_rqmode = ms->m_rqmode;
3314 
3315         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3316         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3317 
3318         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3319                 /* lkb was just created so there won't be an lvb yet */
3320                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3321                 if (!lkb->lkb_lvbptr)
3322                         return -ENOMEM;
3323         }
3324 
3325         return 0;
3326 }
3327 
3328 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3329                                 struct dlm_message *ms)
3330 {
3331         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3332                 return -EBUSY;
3333 
3334         if (receive_lvb(ls, lkb, ms))
3335                 return -ENOMEM;
3336 
3337         lkb->lkb_rqmode = ms->m_rqmode;
3338         lkb->lkb_lvbseq = ms->m_lvbseq;
3339 
3340         return 0;
3341 }
3342 
3343 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3344                                struct dlm_message *ms)
3345 {
3346         if (receive_lvb(ls, lkb, ms))
3347                 return -ENOMEM;
3348         return 0;
3349 }
3350 
3351 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3352    uses to send a reply and that the remote end uses to process the reply. */
3353 
3354 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3355 {
3356         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3357         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3358         lkb->lkb_remid = ms->m_lkid;
3359 }
3360 
3361 /* This is called after the rsb is locked so that we can safely inspect
3362    fields in the lkb. */
3363 
3364 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3365 {
3366         int from = ms->m_header.h_nodeid;
3367         int error = 0;
3368 
3369         switch (ms->m_type) {
3370         case DLM_MSG_CONVERT:
3371         case DLM_MSG_UNLOCK:
3372         case DLM_MSG_CANCEL:
3373                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3374                         error = -EINVAL;
3375                 break;
3376 
3377         case DLM_MSG_CONVERT_REPLY:
3378         case DLM_MSG_UNLOCK_REPLY:
3379         case DLM_MSG_CANCEL_REPLY:
3380         case DLM_MSG_GRANT:
3381         case DLM_MSG_BAST:
3382                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3383                         error = -EINVAL;
3384                 break;
3385 
3386         case DLM_MSG_REQUEST_REPLY:
3387                 if (!is_process_copy(lkb))
3388                         error = -EINVAL;
3389                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3390                         error = -EINVAL;
3391                 break;
3392 
3393         default:
3394                 error = -EINVAL;
3395         }
3396 
3397         if (error)
3398                 log_error(lkb->lkb_resource->res_ls,
3399                           "ignore invalid message %d from %d %x %x %x %d",
3400                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3401                           lkb->lkb_flags, lkb->lkb_nodeid);
3402         return error;
3403 }
3404 
3405 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3406 {
3407         struct dlm_lkb *lkb;
3408         struct dlm_rsb *r;
3409         int error, namelen;
3410 
3411         error = create_lkb(ls, &lkb);
3412         if (error)
3413                 goto fail;
3414 
3415         receive_flags(lkb, ms);
3416         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3417         error = receive_request_args(ls, lkb, ms);
3418         if (error) {
3419                 __put_lkb(ls, lkb);
3420                 goto fail;
3421         }
3422 
3423         namelen = receive_extralen(ms);
3424 
3425         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3426         if (error) {
3427                 __put_lkb(ls, lkb);
3428                 goto fail;
3429         }
3430 
3431         lock_rsb(r);
3432 
3433         attach_lkb(r, lkb);
3434         error = do_request(r, lkb);
3435         send_request_reply(r, lkb, error);
3436         do_request_effects(r, lkb, error);
3437 
3438         unlock_rsb(r);
3439         put_rsb(r);
3440 
3441         if (error == -EINPROGRESS)
3442                 error = 0;
3443         if (error)
3444                 dlm_put_lkb(lkb);
3445         return 0;
3446 
3447  fail:
3448         setup_stub_lkb(ls, ms);
3449         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3450         return error;
3451 }
3452 
3453 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3454 {
3455         struct dlm_lkb *lkb;
3456         struct dlm_rsb *r;
3457         int error, reply = 1;
3458 
3459         error = find_lkb(ls, ms->m_remid, &lkb);
3460         if (error)
3461                 goto fail;
3462 
3463         if (lkb->lkb_remid != ms->m_lkid) {
3464                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3465                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3466                           (unsigned long long)lkb->lkb_recover_seq,
3467                           ms->m_header.h_nodeid, ms->m_lkid);
3468                 error = -ENOENT;
3469                 goto fail;
3470         }
3471 
3472         r = lkb->lkb_resource;
3473 
3474         hold_rsb(r);
3475         lock_rsb(r);
3476 
3477         error = validate_message(lkb, ms);
3478         if (error)
3479                 goto out;
3480 
3481         receive_flags(lkb, ms);
3482 
3483         error = receive_convert_args(ls, lkb, ms);
3484         if (error) {
3485                 send_convert_reply(r, lkb, error);
3486                 goto out;
3487         }
3488 
3489         reply = !down_conversion(lkb);
3490 
3491         error = do_convert(r, lkb);
3492         if (reply)
3493                 send_convert_reply(r, lkb, error);
3494         do_convert_effects(r, lkb, error);
3495  out:
3496         unlock_rsb(r);
3497         put_rsb(r);
3498         dlm_put_lkb(lkb);
3499         return 0;
3500 
3501  fail:
3502         setup_stub_lkb(ls, ms);
3503         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3504         return error;
3505 }
3506 
3507 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3508 {
3509         struct dlm_lkb *lkb;
3510         struct dlm_rsb *r;
3511         int error;
3512 
3513         error = find_lkb(ls, ms->m_remid, &lkb);
3514         if (error)
3515                 goto fail;
3516 
3517         if (lkb->lkb_remid != ms->m_lkid) {
3518                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3519                           lkb->lkb_id, lkb->lkb_remid,
3520                           ms->m_header.h_nodeid, ms->m_lkid);
3521                 error = -ENOENT;
3522                 goto fail;
3523         }
3524 
3525         r = lkb->lkb_resource;
3526 
3527         hold_rsb(r);
3528         lock_rsb(r);
3529 
3530         error = validate_message(lkb, ms);
3531         if (error)
3532                 goto out;
3533 
3534         receive_flags(lkb, ms);
3535 
3536         error = receive_unlock_args(ls, lkb, ms);
3537         if (error) {
3538                 send_unlock_reply(r, lkb, error);
3539                 goto out;
3540         }
3541 
3542         error = do_unlock(r, lkb);
3543         send_unlock_reply(r, lkb, error);
3544         do_unlock_effects(r, lkb, error);
3545  out:
3546         unlock_rsb(r);
3547         put_rsb(r);
3548         dlm_put_lkb(lkb);
3549         return 0;
3550 
3551  fail:
3552         setup_stub_lkb(ls, ms);
3553         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3554         return error;
3555 }
3556 
3557 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3558 {
3559         struct dlm_lkb *lkb;
3560         struct dlm_rsb *r;
3561         int error;
3562 
3563         error = find_lkb(ls, ms->m_remid, &lkb);
3564         if (error)
3565                 goto fail;
3566 
3567         receive_flags(lkb, ms);
3568 
3569         r = lkb->lkb_resource;
3570 
3571         hold_rsb(r);
3572         lock_rsb(r);
3573 
3574         error = validate_message(lkb, ms);
3575         if (error)
3576                 goto out;
3577 
3578         error = do_cancel(r, lkb);
3579         send_cancel_reply(r, lkb, error);
3580         do_cancel_effects(r, lkb, error);
3581  out:
3582         unlock_rsb(r);
3583         put_rsb(r);
3584         dlm_put_lkb(lkb);
3585         return 0;
3586 
3587  fail:
3588         setup_stub_lkb(ls, ms);
3589         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3590         return error;
3591 }
3592 
3593 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3594 {
3595         struct dlm_lkb *lkb;
3596         struct dlm_rsb *r;
3597         int error;
3598 
3599         error = find_lkb(ls, ms->m_remid, &lkb);
3600         if (error)
3601                 return error;
3602 
3603         r = lkb->lkb_resource;
3604 
3605         hold_rsb(r);
3606         lock_rsb(r);
3607 
3608         error = validate_message(lkb, ms);
3609         if (error)
3610                 goto out;
3611 
3612         receive_flags_reply(lkb, ms);
3613         if (is_altmode(lkb))
3614                 munge_altmode(lkb, ms);
3615         grant_lock_pc(r, lkb, ms);
3616         queue_cast(r, lkb, 0);
3617  out:
3618         unlock_rsb(r);
3619         put_rsb(r);
3620         dlm_put_lkb(lkb);
3621         return 0;
3622 }
3623 
3624 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3625 {
3626         struct dlm_lkb *lkb;
3627         struct dlm_rsb *r;
3628         int error;
3629 
3630         error = find_lkb(ls, ms->m_remid, &lkb);
3631         if (error)
3632                 return error;
3633 
3634         r = lkb->lkb_resource;
3635 
3636         hold_rsb(r);
3637         lock_rsb(r);
3638 
3639         error = validate_message(lkb, ms);
3640         if (error)
3641                 goto out;
3642 
3643         queue_bast(r, lkb, ms->m_bastmode);
3644         lkb->lkb_highbast = ms->m_bastmode;
3645  out:
3646         unlock_rsb(r);
3647         put_rsb(r);
3648         dlm_put_lkb(lkb);
3649         return 0;
3650 }
3651 
3652 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3653 {
3654         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3655 
3656         from_nodeid = ms->m_header.h_nodeid;
3657         our_nodeid = dlm_our_nodeid();
3658 
3659         len = receive_extralen(ms);
3660 
3661         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3662         if (dir_nodeid != our_nodeid) {
3663                 log_error(ls, "lookup dir_nodeid %d from %d",
3664                           dir_nodeid, from_nodeid);
3665                 error = -EINVAL;
3666                 ret_nodeid = -1;
3667                 goto out;
3668         }
3669 
3670         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3671 
3672         /* Optimization: we're master so treat lookup as a request */
3673         if (!error && ret_nodeid == our_nodeid) {
3674                 receive_request(ls, ms);
3675                 return;
3676         }
3677  out:
3678         send_lookup_reply(ls, ms, ret_nodeid, error);
3679 }
3680 
3681 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3682 {
3683         int len, dir_nodeid, from_nodeid;
3684 
3685         from_nodeid = ms->m_header.h_nodeid;
3686 
3687         len = receive_extralen(ms);
3688 
3689         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3690         if (dir_nodeid != dlm_our_nodeid()) {
3691                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3692                           dir_nodeid, from_nodeid);
3693                 return;
3694         }
3695 
3696         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3697 }
3698 
3699 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3700 {
3701         do_purge(ls, ms->m_nodeid, ms->m_pid);
3702 }
3703 
3704 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3705 {
3706         struct dlm_lkb *lkb;
3707         struct dlm_rsb *r;
3708         int error, mstype, result;
3709 
3710         error = find_lkb(ls, ms->m_remid, &lkb);
3711         if (error)
3712                 return error;
3713 
3714         r = lkb->lkb_resource;
3715         hold_rsb(r);
3716         lock_rsb(r);
3717 
3718         error = validate_message(lkb, ms);
3719         if (error)
3720                 goto out;
3721 
3722         mstype = lkb->lkb_wait_type;
3723         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3724         if (error) {
3725                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3727                           ms->m_result);
3728                 dlm_dump_rsb(r);
3729                 goto out;
3730         }
3731 
3732         /* Optimization: the dir node was also the master, so it took our
3733            lookup as a request and sent request reply instead of lookup reply */
3734         if (mstype == DLM_MSG_LOOKUP) {
3735                 r->res_nodeid = ms->m_header.h_nodeid;
3736                 lkb->lkb_nodeid = r->res_nodeid;
3737         }
3738 
3739         /* this is the value returned from do_request() on the master */
3740         result = ms->m_result;
3741 
3742         switch (result) {
3743         case -EAGAIN:
3744                 /* request would block (be queued) on remote master */
3745                 queue_cast(r, lkb, -EAGAIN);
3746                 confirm_master(r, -EAGAIN);
3747                 unhold_lkb(lkb); /* undoes create_lkb() */
3748                 break;
3749 
3750         case -EINPROGRESS:
3751         case 0:
3752                 /* request was queued or granted on remote master */
3753                 receive_flags_reply(lkb, ms);
3754                 lkb->lkb_remid = ms->m_lkid;
3755                 if (is_altmode(lkb))
3756                         munge_altmode(lkb, ms);
3757                 if (result) {
3758                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3759                         add_timeout(lkb);
3760                 } else {
3761                         grant_lock_pc(r, lkb, ms);
3762                         queue_cast(r, lkb, 0);
3763                 }
3764                 confirm_master(r, result);
3765                 break;
3766 
3767         case -EBADR:
3768         case -ENOTBLK:
3769                 /* find_rsb failed to find rsb or rsb wasn't master */
3770                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3771                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3772                 r->res_nodeid = -1;
3773                 lkb->lkb_nodeid = -1;
3774 
3775                 if (is_overlap(lkb)) {
3776                         /* we'll ignore error in cancel/unlock reply */
3777                         queue_cast_overlap(r, lkb);
3778                         confirm_master(r, result);
3779                         unhold_lkb(lkb); /* undoes create_lkb() */
3780                 } else
3781                         _request_lock(r, lkb);
3782                 break;
3783 
3784         default:
3785                 log_error(ls, "receive_request_reply %x error %d",
3786                           lkb->lkb_id, result);
3787         }
3788 
3789         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3790                 log_debug(ls, "receive_request_reply %x result %d unlock",
3791                           lkb->lkb_id, result);
3792                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3793                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3794                 send_unlock(r, lkb);
3795         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3796                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3797                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3798                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3799                 send_cancel(r, lkb);
3800         } else {
3801                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3802                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3803         }
3804  out:
3805         unlock_rsb(r);
3806         put_rsb(r);
3807         dlm_put_lkb(lkb);
3808         return 0;
3809 }
3810 
3811 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3812                                     struct dlm_message *ms)
3813 {
3814         /* this is the value returned from do_convert() on the master */
3815         switch (ms->m_result) {
3816         case -EAGAIN:
3817                 /* convert would block (be queued) on remote master */
3818                 queue_cast(r, lkb, -EAGAIN);
3819                 break;
3820 
3821         case -EDEADLK:
3822                 receive_flags_reply(lkb, ms);
3823                 revert_lock_pc(r, lkb);
3824                 queue_cast(r, lkb, -EDEADLK);
3825                 break;
3826 
3827         case -EINPROGRESS:
3828                 /* convert was queued on remote master */
3829                 receive_flags_reply(lkb, ms);
3830                 if (is_demoted(lkb))
3831                         munge_demoted(lkb);
3832                 del_lkb(r, lkb);
3833                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3834                 add_timeout(lkb);
3835                 break;
3836 
3837         case 0:
3838                 /* convert was granted on remote master */
3839                 receive_flags_reply(lkb, ms);
3840                 if (is_demoted(lkb))
3841                         munge_demoted(lkb);
3842                 grant_lock_pc(r, lkb, ms);
3843                 queue_cast(r, lkb, 0);
3844                 break;
3845 
3846         default:
3847                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3848                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3849                           ms->m_result);
3850                 dlm_print_rsb(r);
3851                 dlm_print_lkb(lkb);
3852         }
3853 }
3854 
3855 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3856 {
3857         struct dlm_rsb *r = lkb->lkb_resource;
3858         int error;
3859 
3860         hold_rsb(r);
3861         lock_rsb(r);
3862 
3863         error = validate_message(lkb, ms);
3864         if (error)
3865                 goto out;
3866 
3867         /* stub reply can happen with waiters_mutex held */
3868         error = remove_from_waiters_ms(lkb, ms);
3869         if (error)
3870                 goto out;
3871 
3872         __receive_convert_reply(r, lkb, ms);
3873  out:
3874         unlock_rsb(r);
3875         put_rsb(r);
3876 }
3877 
3878 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3879 {
3880         struct dlm_lkb *lkb;
3881         int error;
3882 
3883         error = find_lkb(ls, ms->m_remid, &lkb);
3884         if (error)
3885                 return error;
3886 
3887         _receive_convert_reply(lkb, ms);
3888         dlm_put_lkb(lkb);
3889         return 0;
3890 }
3891 
3892 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3893 {
3894         struct dlm_rsb *r = lkb->lkb_resource;
3895         int error;
3896 
3897         hold_rsb(r);
3898         lock_rsb(r);
3899 
3900         error = validate_message(lkb, ms);
3901         if (error)
3902                 goto out;
3903 
3904         /* stub reply can happen with waiters_mutex held */
3905         error = remove_from_waiters_ms(lkb, ms);
3906         if (error)
3907                 goto out;
3908 
3909         /* this is the value returned from do_unlock() on the master */
3910 
3911         switch (ms->m_result) {
3912         case -DLM_EUNLOCK:
3913                 receive_flags_reply(lkb, ms);
3914                 remove_lock_pc(r, lkb);
3915                 queue_cast(r, lkb, -DLM_EUNLOCK);
3916                 break;
3917         case -ENOENT:
3918                 break;
3919         default:
3920                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3921                           lkb->lkb_id, ms->m_result);
3922         }
3923  out:
3924         unlock_rsb(r);
3925         put_rsb(r);
3926 }
3927 
3928 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3929 {
3930         struct dlm_lkb *lkb;
3931         int error;
3932 
3933         error = find_lkb(ls, ms->m_remid, &lkb);
3934         if (error)
3935                 return error;
3936 
3937         _receive_unlock_reply(lkb, ms);
3938         dlm_put_lkb(lkb);
3939         return 0;
3940 }
3941 
3942 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3943 {
3944         struct dlm_rsb *r = lkb->lkb_resource;
3945         int error;
3946 
3947         hold_rsb(r);
3948         lock_rsb(r);
3949 
3950         error = validate_message(lkb, ms);
3951         if (error)
3952                 goto out;
3953 
3954         /* stub reply can happen with waiters_mutex held */
3955         error = remove_from_waiters_ms(lkb, ms);
3956         if (error)
3957                 goto out;
3958 
3959         /* this is the value returned from do_cancel() on the master */
3960 
3961         switch (ms->m_result) {
3962         case -DLM_ECANCEL:
3963                 receive_flags_reply(lkb, ms);
3964                 revert_lock_pc(r, lkb);
3965                 queue_cast(r, lkb, -DLM_ECANCEL);
3966                 break;
3967         case 0:
3968                 break;
3969         default:
3970                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3971                           lkb->lkb_id, ms->m_result);
3972         }
3973  out:
3974         unlock_rsb(r);
3975         put_rsb(r);
3976 }
3977 
3978 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3979 {
3980         struct dlm_lkb *lkb;
3981         int error;
3982 
3983         error = find_lkb(ls, ms->m_remid, &lkb);
3984         if (error)
3985                 return error;
3986 
3987         _receive_cancel_reply(lkb, ms);
3988         dlm_put_lkb(lkb);
3989         return 0;
3990 }
3991 
3992 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3993 {
3994         struct dlm_lkb *lkb;
3995         struct dlm_rsb *r;
3996         int error, ret_nodeid;
3997 
3998         error = find_lkb(ls, ms->m_lkid, &lkb);
3999         if (error) {
4000                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4001                 return;
4002         }
4003 
4004         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
4005            FIXME: will a non-zero error ever be returned? */
4006 
4007         r = lkb->lkb_resource;
4008         hold_rsb(r);
4009         lock_rsb(r);
4010 
4011         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4012         if (error)
4013                 goto out;
4014 
4015         ret_nodeid = ms->m_nodeid;
4016         if (ret_nodeid == dlm_our_nodeid()) {
4017                 r->res_nodeid = 0;
4018                 ret_nodeid = 0;
4019                 r->res_first_lkid = 0;
4020         } else {
4021                 /* set_master() will copy res_nodeid to lkb_nodeid */
4022                 r->res_nodeid = ret_nodeid;
4023         }
4024 
4025         if (is_overlap(lkb)) {
4026                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4027                           lkb->lkb_id, lkb->lkb_flags);
4028                 queue_cast_overlap(r, lkb);
4029                 unhold_lkb(lkb); /* undoes create_lkb() */
4030                 goto out_list;
4031         }
4032 
4033         _request_lock(r, lkb);
4034 
4035  out_list:
4036         if (!ret_nodeid)
4037                 process_lookup_list(r);
4038  out:
4039         unlock_rsb(r);
4040         put_rsb(r);
4041         dlm_put_lkb(lkb);
4042 }
4043 
4044 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4045                              uint32_t saved_seq)
4046 {
4047         int error = 0, noent = 0;
4048 
4049         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4050                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4051                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4052                           ms->m_remid, ms->m_result);
4053                 return;
4054         }
4055 
4056         switch (ms->m_type) {
4057 
4058         /* messages sent to a master node */
4059 
4060         case DLM_MSG_REQUEST:
4061                 error = receive_request(ls, ms);
4062                 break;
4063 
4064         case DLM_MSG_CONVERT:
4065                 error = receive_convert(ls, ms);
4066                 break;
4067 
4068         case DLM_MSG_UNLOCK:
4069                 error = receive_unlock(ls, ms);
4070                 break;
4071 
4072         case DLM_MSG_CANCEL:
4073                 noent = 1;
4074                 error = receive_cancel(ls, ms);
4075                 break;
4076 
4077         /* messages sent from a master node (replies to above) */
4078 
4079         case DLM_MSG_REQUEST_REPLY:
4080                 error = receive_request_reply(ls, ms);
4081                 break;
4082 
4083         case DLM_MSG_CONVERT_REPLY:
4084                 error = receive_convert_reply(ls, ms);
4085                 break;
4086 
4087         case DLM_MSG_UNLOCK_REPLY:
4088                 error = receive_unlock_reply(ls, ms);
4089                 break;
4090 
4091         case DLM_MSG_CANCEL_REPLY:
4092                 error = receive_cancel_reply(ls, ms);
4093                 break;
4094 
4095         /* messages sent from a master node (only two types of async msg) */
4096 
4097         case DLM_MSG_GRANT:
4098                 noent = 1;
4099                 error = receive_grant(ls, ms);
4100                 break;
4101 
4102         case DLM_MSG_BAST:
4103                 noent = 1;
4104                 error = receive_bast(ls, ms);
4105                 break;
4106 
4107         /* messages sent to a dir node */
4108 
4109         case DLM_MSG_LOOKUP:
4110                 receive_lookup(ls, ms);
4111                 break;
4112 
4113         case DLM_MSG_REMOVE:
4114                 receive_remove(ls, ms);
4115                 break;
4116 
4117         /* messages sent from a dir node (remove has no reply) */
4118 
4119         case DLM_MSG_LOOKUP_REPLY:
4120                 receive_lookup_reply(ls, ms);
4121                 break;
4122 
4123         /* other messages */
4124 
4125         case DLM_MSG_PURGE:
4126                 receive_purge(ls, ms);
4127                 break;
4128 
4129         default:
4130                 log_error(ls, "unknown message type %d", ms->m_type);
4131         }
4132 
4133         /*
4134          * When checking for ENOENT, we're checking the result of
4135          * find_lkb(m_remid):
4136          *
4137          * The lock id referenced in the message wasn't found.  This may
4138          * happen in normal usage for the async messages and cancel, so
4139          * only use log_debug for them.
4140          *
4141          * Some errors are expected and normal.
4142          */
4143 
4144         if (error == -ENOENT && noent) {
4145                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4146                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4147                           ms->m_lkid, saved_seq);
4148         } else if (error == -ENOENT) {
4149                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4150                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4151                           ms->m_lkid, saved_seq);
4152 
4153                 if (ms->m_type == DLM_MSG_CONVERT)
4154                         dlm_dump_rsb_hash(ls, ms->m_hash);
4155         }
4156 
4157         if (error == -EINVAL) {
4158                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4159                           "saved_seq %u",
4160                           ms->m_type, ms->m_header.h_nodeid,
4161                           ms->m_lkid, ms->m_remid, saved_seq);
4162         }
4163 }
4164 
4165 /* If the lockspace is in recovery mode (locking stopped), then normal
4166    messages are saved on the requestqueue for processing after recovery is
4167    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4168    messages off the requestqueue before we process new ones. This occurs right
4169    after recovery completes when we transition from saving all messages on
4170    requestqueue, to processing all the saved messages, to processing new
4171    messages as they arrive. */
4172 
4173 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4174                                 int nodeid)
4175 {
4176         if (dlm_locking_stopped(ls)) {
4177                 dlm_add_requestqueue(ls, nodeid, ms);
4178         } else {
4179                 dlm_wait_requestqueue(ls);
4180                 _receive_message(ls, ms, 0);
4181         }
4182 }
4183 
4184 /* This is called by dlm_recoverd to process messages that were saved on
4185    the requestqueue. */
4186 
4187 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4188                                uint32_t saved_seq)
4189 {
4190         _receive_message(ls, ms, saved_seq);
4191 }
4192 
4193 /* This is called by the midcomms layer when something is received for
4194    the lockspace.  It could be either a MSG (normal message sent as part of
4195    standard locking activity) or an RCOM (recovery message sent as part of
4196    lockspace recovery). */
4197 
4198 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4199 {
4200         struct dlm_header *hd = &p->header;
4201         struct dlm_ls *ls;
4202         int type = 0;
4203 
4204         switch (hd->h_cmd) {
4205         case DLM_MSG:
4206                 dlm_message_in(&p->message);
4207                 type = p->message.m_type;
4208                 break;
4209         case DLM_RCOM:
4210                 dlm_rcom_in(&p->rcom);
4211                 type = p->rcom.rc_type;
4212                 break;
4213         default:
4214                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4215                 return;
4216         }
4217 
4218         if (hd->h_nodeid != nodeid) {
4219                 log_print("invalid h_nodeid %d from %d lockspace %x",
4220                           hd->h_nodeid, nodeid, hd->h_lockspace);
4221                 return;
4222         }
4223 
4224         ls = dlm_find_lockspace_global(hd->h_lockspace);
4225         if (!ls) {
4226                 if (dlm_config.ci_log_debug) {
4227                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4228                                 "%u from %d cmd %d type %d\n",
4229                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
4230                 }
4231 
4232                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4233                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4234                 return;
4235         }
4236 
4237         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4238            be inactive (in this ls) before transitioning to recovery mode */
4239 
4240         down_read(&ls->ls_recv_active);
4241         if (hd->h_cmd == DLM_MSG)
4242                 dlm_receive_message(ls, &p->message, nodeid);
4243         else
4244                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4245         up_read(&ls->ls_recv_active);
4246 
4247         dlm_put_lockspace(ls);
4248 }
4249 
4250 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4251                                    struct dlm_message *ms_stub)
4252 {
4253         if (middle_conversion(lkb)) {
4254                 hold_lkb(lkb);
4255                 memset(ms_stub, 0, sizeof(struct dlm_message));
4256                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4257                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4258                 ms_stub->m_result = -EINPROGRESS;
4259                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4260                 _receive_convert_reply(lkb, ms_stub);
4261 
4262                 /* Same special case as in receive_rcom_lock_args() */
4263                 lkb->lkb_grmode = DLM_LOCK_IV;
4264                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4265                 unhold_lkb(lkb);
4266 
4267         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4268                 lkb->lkb_flags |= DLM_IFL_RESEND;
4269         }
4270 
4271         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4272            conversions are async; there's no reply from the remote master */
4273 }
4274 
4275 /* A waiting lkb needs recovery if the master node has failed, or
4276    the master node is changing (only when no directory is used) */
4277 
4278 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4279                                  int dir_nodeid)
4280 {
4281         if (dlm_no_directory(ls))
4282                 return 1;
4283 
4284         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4285                 return 1;
4286 
4287         return 0;
4288 }
4289 
4290 /* Recovery for locks that are waiting for replies from nodes that are now
4291    gone.  We can just complete unlocks and cancels by faking a reply from the
4292    dead node.  Requests and up-conversions we flag to be resent after
4293    recovery.  Down-conversions can just be completed with a fake reply like
4294    unlocks.  Conversions between PR and CW need special attention. */
4295 
4296 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4297 {
4298         struct dlm_lkb *lkb, *safe;
4299         struct dlm_message *ms_stub;
4300         int wait_type, stub_unlock_result, stub_cancel_result;
4301         int dir_nodeid;
4302 
4303         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4304         if (!ms_stub) {
4305                 log_error(ls, "dlm_recover_waiters_pre no mem");
4306                 return;
4307         }
4308 
4309         mutex_lock(&ls->ls_waiters_mutex);
4310 
4311         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4312 
4313                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4314 
4315                 /* exclude debug messages about unlocks because there can be so
4316                    many and they aren't very interesting */
4317 
4318                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4319                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4320                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4321                                   lkb->lkb_id,
4322                                   lkb->lkb_remid,
4323                                   lkb->lkb_wait_type,
4324                                   lkb->lkb_resource->res_nodeid,
4325                                   lkb->lkb_nodeid,
4326                                   lkb->lkb_wait_nodeid,
4327                                   dir_nodeid);
4328                 }
4329 
4330                 /* all outstanding lookups, regardless of destination  will be
4331                    resent after recovery is done */
4332 
4333                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4334                         lkb->lkb_flags |= DLM_IFL_RESEND;
4335                         continue;
4336                 }
4337 
4338                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4339                         continue;
4340 
4341                 wait_type = lkb->lkb_wait_type;
4342                 stub_unlock_result = -DLM_EUNLOCK;
4343                 stub_cancel_result = -DLM_ECANCEL;
4344 
4345                 /* Main reply may have been received leaving a zero wait_type,
4346                    but a reply for the overlapping op may not have been
4347                    received.  In that case we need to fake the appropriate
4348                    reply for the overlap op. */
4349 
4350                 if (!wait_type) {
4351                         if (is_overlap_cancel(lkb)) {
4352                                 wait_type = DLM_MSG_CANCEL;
4353                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4354                                         stub_cancel_result = 0;
4355                         }
4356                         if (is_overlap_unlock(lkb)) {
4357                                 wait_type = DLM_MSG_UNLOCK;
4358                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4359                                         stub_unlock_result = -ENOENT;
4360                         }
4361 
4362                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4363                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4364                                   stub_cancel_result, stub_unlock_result);
4365                 }
4366 
4367                 switch (wait_type) {
4368 
4369                 case DLM_MSG_REQUEST:
4370                         lkb->lkb_flags |= DLM_IFL_RESEND;
4371                         break;
4372 
4373                 case DLM_MSG_CONVERT:
4374                         recover_convert_waiter(ls, lkb, ms_stub);
4375                         break;
4376 
4377                 case DLM_MSG_UNLOCK:
4378                         hold_lkb(lkb);
4379                         memset(ms_stub, 0, sizeof(struct dlm_message));
4380                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4381                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4382                         ms_stub->m_result = stub_unlock_result;
4383                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4384                         _receive_unlock_reply(lkb, ms_stub);
4385                         dlm_put_lkb(lkb);
4386                         break;
4387 
4388                 case DLM_MSG_CANCEL:
4389                         hold_lkb(lkb);
4390                         memset(ms_stub, 0, sizeof(struct dlm_message));
4391                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4392                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4393                         ms_stub->m_result = stub_cancel_result;
4394                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4395                         _receive_cancel_reply(lkb, ms_stub);
4396                         dlm_put_lkb(lkb);
4397                         break;
4398 
4399                 default:
4400                         log_error(ls, "invalid lkb wait_type %d %d",
4401                                   lkb->lkb_wait_type, wait_type);
4402                 }
4403                 schedule();
4404         }
4405         mutex_unlock(&ls->ls_waiters_mutex);
4406         kfree(ms_stub);
4407 }
4408 
4409 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4410 {
4411         struct dlm_lkb *lkb;
4412         int found = 0;
4413 
4414         mutex_lock(&ls->ls_waiters_mutex);
4415         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4416                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4417                         hold_lkb(lkb);
4418                         found = 1;
4419                         break;
4420                 }
4421         }
4422         mutex_unlock(&ls->ls_waiters_mutex);
4423 
4424         if (!found)
4425                 lkb = NULL;
4426         return lkb;
4427 }
4428 
4429 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4430    master or dir-node for r.  Processing the lkb may result in it being placed
4431    back on waiters. */
4432 
4433 /* We do this after normal locking has been enabled and any saved messages
4434    (in requestqueue) have been processed.  We should be confident that at
4435    this point we won't get or process a reply to any of these waiting
4436    operations.  But, new ops may be coming in on the rsbs/locks here from
4437    userspace or remotely. */
4438 
4439 /* there may have been an overlap unlock/cancel prior to recovery or after
4440    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4441    overlap flag would just have been set and nothing new sent.  we can be
4442    confident here than any replies to either the initial op or overlap ops
4443    prior to recovery have been received. */
4444 
4445 int dlm_recover_waiters_post(struct dlm_ls *ls)
4446 {
4447         struct dlm_lkb *lkb;
4448         struct dlm_rsb *r;
4449         int error = 0, mstype, err, oc, ou;
4450 
4451         while (1) {
4452                 if (dlm_locking_stopped(ls)) {
4453                         log_debug(ls, "recover_waiters_post aborted");
4454                         error = -EINTR;
4455                         break;
4456                 }
4457 
4458                 lkb = find_resend_waiter(ls);
4459                 if (!lkb)
4460                         break;
4461 
4462                 r = lkb->lkb_resource;
4463                 hold_rsb(r);
4464                 lock_rsb(r);
4465 
4466                 mstype = lkb->lkb_wait_type;
4467                 oc = is_overlap_cancel(lkb);
4468                 ou = is_overlap_unlock(lkb);
4469                 err = 0;
4470 
4471                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4472                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
4473                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
4474                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
4475                           dlm_dir_nodeid(r), oc, ou);
4476 
4477                 /* At this point we assume that we won't get a reply to any
4478                    previous op or overlap op on this lock.  First, do a big
4479                    remove_from_waiters() for all previous ops. */
4480 
4481                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4482                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4483                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4484                 lkb->lkb_wait_type = 0;
4485                 lkb->lkb_wait_count = 0;
4486                 mutex_lock(&ls->ls_waiters_mutex);
4487                 list_del_init(&lkb->lkb_wait_reply);
4488                 mutex_unlock(&ls->ls_waiters_mutex);
4489                 unhold_lkb(lkb); /* for waiters list */
4490 
4491                 if (oc || ou) {
4492                         /* do an unlock or cancel instead of resending */
4493                         switch (mstype) {
4494                         case DLM_MSG_LOOKUP:
4495                         case DLM_MSG_REQUEST:
4496                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4497                                                         -DLM_ECANCEL);
4498                                 unhold_lkb(lkb); /* undoes create_lkb() */
4499                                 break;
4500                         case DLM_MSG_CONVERT:
4501                                 if (oc) {
4502                                         queue_cast(r, lkb, -DLM_ECANCEL);
4503                                 } else {
4504                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4505                                         _unlock_lock(r, lkb);
4506                                 }
4507                                 break;
4508                         default:
4509                                 err = 1;
4510                         }
4511                 } else {
4512                         switch (mstype) {
4513                         case DLM_MSG_LOOKUP:
4514                         case DLM_MSG_REQUEST:
4515                                 _request_lock(r, lkb);
4516                                 if (is_master(r))
4517                                         confirm_master(r, 0);
4518                                 break;
4519                         case DLM_MSG_CONVERT:
4520                                 _convert_lock(r, lkb);
4521                                 break;
4522                         default:
4523                                 err = 1;
4524                         }
4525                 }
4526 
4527                 if (err) {
4528                         log_error(ls, "waiter %x msg %d r_nodeid %d "
4529                                   "dir_nodeid %d overlap %d %d",
4530                                   lkb->lkb_id, mstype, r->res_nodeid,
4531                                   dlm_dir_nodeid(r), oc, ou);
4532                 }
4533                 unlock_rsb(r);
4534                 put_rsb(r);
4535                 dlm_put_lkb(lkb);
4536         }
4537 
4538         return error;
4539 }
4540 
4541 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
4542                               struct list_head *list)
4543 {
4544         struct dlm_lkb *lkb, *safe;
4545 
4546         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4547                 if (!is_master_copy(lkb))
4548                         continue;
4549 
4550                 /* don't purge lkbs we've added in recover_master_copy for
4551                    the current recovery seq */
4552 
4553                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
4554                         continue;
4555 
4556                 del_lkb(r, lkb);
4557 
4558                 /* this put should free the lkb */
4559                 if (!dlm_put_lkb(lkb))
4560                         log_error(ls, "purged mstcpy lkb not released");
4561         }
4562 }
4563 
4564 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4565 {
4566         struct dlm_ls *ls = r->res_ls;
4567 
4568         purge_mstcpy_list(ls, r, &r->res_grantqueue);
4569         purge_mstcpy_list(ls, r, &r->res_convertqueue);
4570         purge_mstcpy_list(ls, r, &r->res_waitqueue);
4571 }
4572 
4573 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
4574                             struct list_head *list,
4575                             int nodeid_gone, unsigned int *count)
4576 {
4577         struct dlm_lkb *lkb, *safe;
4578 
4579         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4580                 if (!is_master_copy(lkb))
4581                         continue;
4582 
4583                 if ((lkb->lkb_nodeid == nodeid_gone) ||
4584                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
4585 
4586                         del_lkb(r, lkb);
4587 
4588                         /* this put should free the lkb */
4589                         if (!dlm_put_lkb(lkb))
4590                                 log_error(ls, "purged dead lkb not released");
4591 
4592                         rsb_set_flag(r, RSB_RECOVER_GRANT);
4593 
4594                         (*count)++;
4595                 }
4596         }
4597 }
4598 
4599 /* Get rid of locks held by nodes that are gone. */
4600 
4601 void dlm_recover_purge(struct dlm_ls *ls)
4602 {
4603         struct dlm_rsb *r;
4604         struct dlm_member *memb;
4605         int nodes_count = 0;
4606         int nodeid_gone = 0;
4607         unsigned int lkb_count = 0;
4608 
4609         /* cache one removed nodeid to optimize the common
4610            case of a single node removed */
4611 
4612         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
4613                 nodes_count++;
4614                 nodeid_gone = memb->nodeid;
4615         }
4616 
4617         if (!nodes_count)
4618                 return;
4619 
4620         down_write(&ls->ls_root_sem);
4621         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4622                 hold_rsb(r);
4623                 lock_rsb(r);
4624                 if (is_master(r)) {
4625                         purge_dead_list(ls, r, &r->res_grantqueue,
4626                                         nodeid_gone, &lkb_count);
4627                         purge_dead_list(ls, r, &r->res_convertqueue,
4628                                         nodeid_gone, &lkb_count);
4629                         purge_dead_list(ls, r, &r->res_waitqueue,
4630                                         nodeid_gone, &lkb_count);
4631                 }
4632                 unlock_rsb(r);
4633                 unhold_rsb(r);
4634                 cond_resched();
4635         }
4636         up_write(&ls->ls_root_sem);
4637 
4638         if (lkb_count)
4639                 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
4640                           lkb_count, nodes_count);
4641 }
4642 
4643 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4644 {
4645         struct rb_node *n;
4646         struct dlm_rsb *r;
4647 
4648         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4649         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4650                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4651 
4652                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4653                         continue;
4654                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4655                 if (!is_master(r))
4656                         continue;
4657                 hold_rsb(r);
4658                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4659                 return r;
4660         }
4661         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4662         return NULL;
4663 }
4664 
4665 /*
4666  * Attempt to grant locks on resources that we are the master of.
4667  * Locks may have become grantable during recovery because locks
4668  * from departed nodes have been purged (or not rebuilt), allowing
4669  * previously blocked locks to now be granted.  The subset of rsb's
4670  * we are interested in are those with lkb's on either the convert or
4671  * waiting queues.
4672  *
4673  * Simplest would be to go through each master rsb and check for non-empty
4674  * convert or waiting queues, and attempt to grant on those rsbs.
4675  * Checking the queues requires lock_rsb, though, for which we'd need
4676  * to release the rsbtbl lock.  This would make iterating through all
4677  * rsb's very inefficient.  So, we rely on earlier recovery routines
4678  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
4679  * locks for.
4680  */
4681 
4682 void dlm_recover_grant(struct dlm_ls *ls)
4683 {
4684         struct dlm_rsb *r;
4685         int bucket = 0;
4686         unsigned int count = 0;
4687         unsigned int rsb_count = 0;
4688         unsigned int lkb_count = 0;
4689 
4690         while (1) {
4691                 r = find_grant_rsb(ls, bucket);
4692                 if (!r) {
4693                         if (bucket == ls->ls_rsbtbl_size - 1)
4694                                 break;
4695                         bucket++;
4696                         continue;
4697                 }
4698                 rsb_count++;
4699                 count = 0;
4700                 lock_rsb(r);
4701                 grant_pending_locks(r, &count);
4702                 lkb_count += count;
4703                 confirm_master(r, 0);
4704                 unlock_rsb(r);
4705                 put_rsb(r);
4706                 cond_resched();
4707         }
4708 
4709         if (lkb_count)
4710                 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
4711                           lkb_count, rsb_count);
4712 }
4713 
4714 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4715                                          uint32_t remid)
4716 {
4717         struct dlm_lkb *lkb;
4718 
4719         list_for_each_entry(lkb, head, lkb_statequeue) {
4720                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4721                         return lkb;
4722         }
4723         return NULL;
4724 }
4725 
4726 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4727                                     uint32_t remid)
4728 {
4729         struct dlm_lkb *lkb;
4730 
4731         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4732         if (lkb)
4733                 return lkb;
4734         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4735         if (lkb)
4736                 return lkb;
4737         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4738         if (lkb)
4739                 return lkb;
4740         return NULL;
4741 }
4742 
4743 /* needs at least dlm_rcom + rcom_lock */
4744 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4745                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4746 {
4747         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4748 
4749         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4750         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4751         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4752         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4753         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4754         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4755         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4756         lkb->lkb_rqmode = rl->rl_rqmode;
4757         lkb->lkb_grmode = rl->rl_grmode;
4758         /* don't set lkb_status because add_lkb wants to itself */
4759 
4760         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4761         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4762 
4763         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4764                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4765                          sizeof(struct rcom_lock);
4766                 if (lvblen > ls->ls_lvblen)
4767                         return -EINVAL;
4768                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4769                 if (!lkb->lkb_lvbptr)
4770                         return -ENOMEM;
4771                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4772         }
4773 
4774         /* Conversions between PR and CW (middle modes) need special handling.
4775            The real granted mode of these converting locks cannot be determined
4776            until all locks have been rebuilt on the rsb (recover_conversion) */
4777 
4778         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4779             middle_conversion(lkb)) {
4780                 rl->rl_status = DLM_LKSTS_CONVERT;
4781                 lkb->lkb_grmode = DLM_LOCK_IV;
4782                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4783         }
4784 
4785         return 0;
4786 }
4787 
4788 /* This lkb may have been recovered in a previous aborted recovery so we need
4789    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4790    If so we just send back a standard reply.  If not, we create a new lkb with
4791    the given values and send back our lkid.  We send back our lkid by sending
4792    back the rcom_lock struct we got but with the remid field filled in. */
4793 
4794 /* needs at least dlm_rcom + rcom_lock */
4795 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4796 {
4797         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4798         struct dlm_rsb *r;
4799         struct dlm_lkb *lkb;
4800         uint32_t remid = 0;
4801         int error;
4802 
4803         if (rl->rl_parent_lkid) {
4804                 error = -EOPNOTSUPP;
4805                 goto out;
4806         }
4807 
4808         remid = le32_to_cpu(rl->rl_lkid);
4809 
4810         /* In general we expect the rsb returned to be R_MASTER, but we don't
4811            have to require it.  Recovery of masters on one node can overlap
4812            recovery of locks on another node, so one node can send us MSTCPY
4813            locks before we've made ourselves master of this rsb.  We can still
4814            add new MSTCPY locks that we receive here without any harm; when
4815            we make ourselves master, dlm_recover_masters() won't touch the
4816            MSTCPY locks we've received early. */
4817 
4818         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
4819         if (error)
4820                 goto out;
4821 
4822         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824                           rc->rc_header.h_nodeid, remid);
4825                 error = -EBADR;
4826                 put_rsb(r);
4827                 goto out;
4828         }
4829 
4830         lock_rsb(r);
4831 
4832         lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4833         if (lkb) {
4834                 error = -EEXIST;
4835                 goto out_remid;
4836         }
4837 
4838         error = create_lkb(ls, &lkb);
4839         if (error)
4840                 goto out_unlock;
4841 
4842         error = receive_rcom_lock_args(ls, lkb, r, rc);
4843         if (error) {
4844                 __put_lkb(ls, lkb);
4845                 goto out_unlock;
4846         }
4847 
4848         attach_lkb(r, lkb);
4849         add_lkb(r, lkb, rl->rl_status);
4850         error = 0;
4851         ls->ls_recover_locks_in++;
4852 
4853         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
4854                 rsb_set_flag(r, RSB_RECOVER_GRANT);
4855 
4856  out_remid:
4857         /* this is the new value returned to the lock holder for
4858            saving in its process-copy lkb */
4859         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4860 
4861         lkb->lkb_recover_seq = ls->ls_recover_seq;
4862 
4863  out_unlock:
4864         unlock_rsb(r);
4865         put_rsb(r);
4866  out:
4867         if (error && error != -EEXIST)
4868                 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4869                           rc->rc_header.h_nodeid, remid, error);
4870         rl->rl_result = cpu_to_le32(error);
4871         return error;
4872 }
4873 
4874 /* needs at least dlm_rcom + rcom_lock */
4875 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4876 {
4877         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4878         struct dlm_rsb *r;
4879         struct dlm_lkb *lkb;
4880         uint32_t lkid, remid;
4881         int error, result;
4882 
4883         lkid = le32_to_cpu(rl->rl_lkid);
4884         remid = le32_to_cpu(rl->rl_remid);
4885         result = le32_to_cpu(rl->rl_result);
4886 
4887         error = find_lkb(ls, lkid, &lkb);
4888         if (error) {
4889                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
4890                           lkid, rc->rc_header.h_nodeid, remid, result);
4891                 return error;
4892         }
4893 
4894         r = lkb->lkb_resource;
4895         hold_rsb(r);
4896         lock_rsb(r);
4897 
4898         if (!is_process_copy(lkb)) {
4899                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4900                           lkid, rc->rc_header.h_nodeid, remid, result);
4901                 dlm_dump_rsb(r);
4902                 unlock_rsb(r);
4903                 put_rsb(r);
4904                 dlm_put_lkb(lkb);
4905                 return -EINVAL;
4906         }
4907 
4908         switch (result) {
4909         case -EBADR:
4910                 /* There's a chance the new master received our lock before
4911                    dlm_recover_master_reply(), this wouldn't happen if we did
4912                    a barrier between recover_masters and recover_locks. */
4913 
4914                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
4915                           lkid, rc->rc_header.h_nodeid, remid, result);
4916         
4917                 dlm_send_rcom_lock(r, lkb);
4918                 goto out;
4919         case -EEXIST:
4920         case 0:
4921                 lkb->lkb_remid = remid;
4922                 break;
4923         default:
4924                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
4925                           lkid, rc->rc_header.h_nodeid, remid, result);
4926         }
4927 
4928         /* an ack for dlm_recover_locks() which waits for replies from
4929            all the locks it sends to new masters */
4930         dlm_recovered_lock(r);
4931  out:
4932         unlock_rsb(r);
4933         put_rsb(r);
4934         dlm_put_lkb(lkb);
4935 
4936         return 0;
4937 }
4938 
4939 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4940                      int mode, uint32_t flags, void *name, unsigned int namelen,
4941                      unsigned long timeout_cs)
4942 {
4943         struct dlm_lkb *lkb;
4944         struct dlm_args args;
4945         int error;
4946 
4947         dlm_lock_recovery(ls);
4948 
4949         error = create_lkb(ls, &lkb);
4950         if (error) {
4951                 kfree(ua);
4952                 goto out;
4953         }
4954 
4955         if (flags & DLM_LKF_VALBLK) {
4956                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4957                 if (!ua->lksb.sb_lvbptr) {
4958                         kfree(ua);
4959                         __put_lkb(ls, lkb);
4960                         error = -ENOMEM;
4961                         goto out;
4962                 }
4963         }
4964 
4965         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4966            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4967            lock and that lkb_astparam is the dlm_user_args structure. */
4968 
4969         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4970                               fake_astfn, ua, fake_bastfn, &args);
4971         lkb->lkb_flags |= DLM_IFL_USER;
4972 
4973         if (error) {
4974                 __put_lkb(ls, lkb);
4975                 goto out;
4976         }
4977 
4978         error = request_lock(ls, lkb, name, namelen, &args);
4979 
4980         switch (error) {
4981         case 0:
4982                 break;
4983         case -EINPROGRESS:
4984                 error = 0;
4985                 break;
4986         case -EAGAIN:
4987                 error = 0;
4988                 /* fall through */
4989         default:
4990                 __put_lkb(ls, lkb);
4991                 goto out;
4992         }
4993 
4994         /* add this new lkb to the per-process list of locks */
4995         spin_lock(&ua->proc->locks_spin);
4996         hold_lkb(lkb);
4997         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4998         spin_unlock(&ua->proc->locks_spin);
4999  out:
5000         dlm_unlock_recovery(ls);
5001         return error;
5002 }
5003 
5004 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5005                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5006                      unsigned long timeout_cs)
5007 {
5008         struct dlm_lkb *lkb;
5009         struct dlm_args args;
5010         struct dlm_user_args *ua;
5011         int error;
5012 
5013         dlm_lock_recovery(ls);
5014 
5015         error = find_lkb(ls, lkid, &lkb);
5016         if (error)
5017                 goto out;
5018 
5019         /* user can change the params on its lock when it converts it, or
5020            add an lvb that didn't exist before */
5021 
5022         ua = lkb->lkb_ua;
5023 
5024         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5025                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5026                 if (!ua->lksb.sb_lvbptr) {
5027                         error = -ENOMEM;
5028                         goto out_put;
5029                 }
5030         }
5031         if (lvb_in && ua->lksb.sb_lvbptr)
5032                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5033 
5034         ua->xid = ua_tmp->xid;
5035         ua->castparam = ua_tmp->castparam;
5036         ua->castaddr = ua_tmp->castaddr;
5037         ua->bastparam = ua_tmp->bastparam;
5038         ua->bastaddr = ua_tmp->bastaddr;
5039         ua->user_lksb = ua_tmp->user_lksb;
5040 
5041         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5042                               fake_astfn, ua, fake_bastfn, &args);
5043         if (error)
5044                 goto out_put;
5045 
5046         error = convert_lock(ls, lkb, &args);
5047 
5048         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5049                 error = 0;
5050  out_put:
5051         dlm_put_lkb(lkb);
5052  out:
5053         dlm_unlock_recovery(ls);
5054         kfree(ua_tmp);
5055         return error;
5056 }
5057 
5058 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5059                     uint32_t flags, uint32_t lkid, char *lvb_in)
5060 {
5061         struct dlm_lkb *lkb;
5062         struct dlm_args args;
5063         struct dlm_user_args *ua;
5064         int error;
5065 
5066         dlm_lock_recovery(ls);
5067 
5068         error = find_lkb(ls, lkid, &lkb);
5069         if (error)
5070                 goto out;
5071 
5072         ua = lkb->lkb_ua;
5073 
5074         if (lvb_in && ua->lksb.sb_lvbptr)
5075                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5076         if (ua_tmp->castparam)
5077                 ua->castparam = ua_tmp->castparam;
5078         ua->user_lksb = ua_tmp->user_lksb;
5079 
5080         error = set_unlock_args(flags, ua, &args);
5081         if (error)
5082                 goto out_put;
5083 
5084         error = unlock_lock(ls, lkb, &args);
5085 
5086         if (error == -DLM_EUNLOCK)
5087                 error = 0;
5088         /* from validate_unlock_args() */
5089         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5090                 error = 0;
5091         if (error)
5092                 goto out_put;
5093 
5094         spin_lock(&ua->proc->locks_spin);
5095         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5096         if (!list_empty(&lkb->lkb_ownqueue))
5097                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5098         spin_unlock(&ua->proc->locks_spin);
5099  out_put:
5100         dlm_put_lkb(lkb);
5101  out:
5102         dlm_unlock_recovery(ls);
5103         kfree(ua_tmp);
5104         return error;
5105 }
5106 
5107 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5108                     uint32_t flags, uint32_t lkid)
5109 {
5110         struct dlm_lkb *lkb;
5111         struct dlm_args args;
5112         struct dlm_user_args *ua;
5113         int error;
5114 
5115         dlm_lock_recovery(ls);
5116 
5117         error = find_lkb(ls, lkid, &lkb);
5118         if (error)
5119                 goto out;
5120 
5121         ua = lkb->lkb_ua;
5122         if (ua_tmp->castparam)
5123                 ua->castparam = ua_tmp->castparam;
5124         ua->user_lksb = ua_tmp->user_lksb;
5125 
5126         error = set_unlock_args(flags, ua, &args);
5127         if (error)
5128                 goto out_put;
5129 
5130         error = cancel_lock(ls, lkb, &args);
5131 
5132         if (error == -DLM_ECANCEL)
5133                 error = 0;
5134         /* from validate_unlock_args() */
5135         if (error == -EBUSY)
5136                 error = 0;
5137  out_put:
5138         dlm_put_lkb(lkb);
5139  out:
5140         dlm_unlock_recovery(ls);
5141         kfree(ua_tmp);
5142         return error;
5143 }
5144 
5145 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5146 {
5147         struct dlm_lkb *lkb;
5148         struct dlm_args args;
5149         struct dlm_user_args *ua;
5150         struct dlm_rsb *r;
5151         int error;
5152 
5153         dlm_lock_recovery(ls);
5154 
5155         error = find_lkb(ls, lkid, &lkb);
5156         if (error)
5157                 goto out;
5158 
5159         ua = lkb->lkb_ua;
5160 
5161         error = set_unlock_args(flags, ua, &args);
5162         if (error)
5163                 goto out_put;
5164 
5165         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5166 
5167         r = lkb->lkb_resource;
5168         hold_rsb(r);
5169         lock_rsb(r);
5170 
5171         error = validate_unlock_args(lkb, &args);
5172         if (error)
5173                 goto out_r;
5174         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5175 
5176         error = _cancel_lock(r, lkb);
5177  out_r:
5178         unlock_rsb(r);
5179         put_rsb(r);
5180 
5181         if (error == -DLM_ECANCEL)
5182                 error = 0;
5183         /* from validate_unlock_args() */
5184         if (error == -EBUSY)
5185                 error = 0;
5186  out_put:
5187         dlm_put_lkb(lkb);
5188  out:
5189         dlm_unlock_recovery(ls);
5190         return error;
5191 }
5192 
5193 /* lkb's that are removed from the waiters list by revert are just left on the
5194    orphans list with the granted orphan locks, to be freed by purge */
5195 
5196 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5197 {
5198         struct dlm_args args;
5199         int error;
5200 
5201         hold_lkb(lkb);
5202         mutex_lock(&ls->ls_orphans_mutex);
5203         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5204         mutex_unlock(&ls->ls_orphans_mutex);
5205 
5206         set_unlock_args(0, lkb->lkb_ua, &args);
5207 
5208         error = cancel_lock(ls, lkb, &args);
5209         if (error == -DLM_ECANCEL)
5210                 error = 0;
5211         return error;
5212 }
5213 
5214 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5215    Regardless of what rsb queue the lock is on, it's removed and freed. */
5216 
5217 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5218 {
5219         struct dlm_args args;
5220         int error;
5221 
5222         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5223 
5224         error = unlock_lock(ls, lkb, &args);
5225         if (error == -DLM_EUNLOCK)
5226                 error = 0;
5227         return error;
5228 }
5229 
5230 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5231    (which does lock_rsb) due to deadlock with receiving a message that does
5232    lock_rsb followed by dlm_user_add_cb() */
5233 
5234 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5235                                      struct dlm_user_proc *proc)
5236 {
5237         struct dlm_lkb *lkb = NULL;
5238 
5239         mutex_lock(&ls->ls_clear_proc_locks);
5240         if (list_empty(&proc->locks))
5241                 goto out;
5242 
5243         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5244         list_del_init(&lkb->lkb_ownqueue);
5245 
5246         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5247                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5248         else
5249                 lkb->lkb_flags |= DLM_IFL_DEAD;
5250  out:
5251         mutex_unlock(&ls->ls_clear_proc_locks);
5252         return lkb;
5253 }
5254 
5255 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5256    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5257    which we clear here. */
5258 
5259 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5260    list, and no more device_writes should add lkb's to proc->locks list; so we
5261    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5262    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5263    them ourself. */
5264 
5265 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5266 {
5267         struct dlm_lkb *lkb, *safe;
5268 
5269         dlm_lock_recovery(ls);
5270 
5271         while (1) {
5272                 lkb = del_proc_lock(ls, proc);
5273                 if (!lkb)
5274                         break;
5275                 del_timeout(lkb);
5276                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5277                         orphan_proc_lock(ls, lkb);
5278                 else
5279                         unlock_proc_lock(ls, lkb);
5280 
5281                 /* this removes the reference for the proc->locks list
5282                    added by dlm_user_request, it may result in the lkb
5283                    being freed */
5284 
5285                 dlm_put_lkb(lkb);
5286         }
5287 
5288         mutex_lock(&ls->ls_clear_proc_locks);
5289 
5290         /* in-progress unlocks */
5291         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5292                 list_del_init(&lkb->lkb_ownqueue);
5293                 lkb->lkb_flags |= DLM_IFL_DEAD;
5294                 dlm_put_lkb(lkb);
5295         }
5296 
5297         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5298                 memset(&lkb->lkb_callbacks, 0,
5299                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5300                 list_del_init(&lkb->lkb_cb_list);
5301                 dlm_put_lkb(lkb);
5302         }
5303 
5304         mutex_unlock(&ls->ls_clear_proc_locks);
5305         dlm_unlock_recovery(ls);
5306 }
5307 
5308 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5309 {
5310         struct dlm_lkb *lkb, *safe;
5311 
5312         while (1) {
5313                 lkb = NULL;
5314                 spin_lock(&proc->locks_spin);
5315                 if (!list_empty(&proc->locks)) {
5316                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
5317                                          lkb_ownqueue);
5318                         list_del_init(&lkb->lkb_ownqueue);
5319                 }
5320                 spin_unlock(&proc->locks_spin);
5321 
5322                 if (!lkb)
5323                         break;
5324 
5325                 lkb->lkb_flags |= DLM_IFL_DEAD;
5326                 unlock_proc_lock(ls, lkb);
5327                 dlm_put_lkb(lkb); /* ref from proc->locks list */
5328         }
5329 
5330         spin_lock(&proc->locks_spin);
5331         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5332                 list_del_init(&lkb->lkb_ownqueue);
5333                 lkb->lkb_flags |= DLM_IFL_DEAD;
5334                 dlm_put_lkb(lkb);
5335         }
5336         spin_unlock(&proc->locks_spin);
5337 
5338         spin_lock(&proc->asts_spin);
5339         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5340                 memset(&lkb->lkb_callbacks, 0,
5341                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5342                 list_del_init(&lkb->lkb_cb_list);
5343                 dlm_put_lkb(lkb);
5344         }
5345         spin_unlock(&proc->asts_spin);
5346 }
5347 
5348 /* pid of 0 means purge all orphans */
5349 
5350 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5351 {
5352         struct dlm_lkb *lkb, *safe;
5353 
5354         mutex_lock(&ls->ls_orphans_mutex);
5355         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5356                 if (pid && lkb->lkb_ownpid != pid)
5357                         continue;
5358                 unlock_proc_lock(ls, lkb);
5359                 list_del_init(&lkb->lkb_ownqueue);
5360                 dlm_put_lkb(lkb);
5361         }
5362         mutex_unlock(&ls->ls_orphans_mutex);
5363 }
5364 
5365 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5366 {
5367         struct dlm_message *ms;
5368         struct dlm_mhandle *mh;
5369         int error;
5370 
5371         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5372                                 DLM_MSG_PURGE, &ms, &mh);
5373         if (error)
5374                 return error;
5375         ms->m_nodeid = nodeid;
5376         ms->m_pid = pid;
5377 
5378         return send_message(mh, ms);
5379 }
5380 
5381 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5382                    int nodeid, int pid)
5383 {
5384         int error = 0;
5385 
5386         if (nodeid != dlm_our_nodeid()) {
5387                 error = send_purge(ls, nodeid, pid);
5388         } else {
5389                 dlm_lock_recovery(ls);
5390                 if (pid == current->pid)
5391                         purge_proc_locks(ls, proc);
5392                 else
5393                         do_purge(ls, nodeid, pid);
5394                 dlm_unlock_recovery(ls);
5395         }
5396         return error;
5397 }
5398 
5399 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp