~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/ocfs2/dlm/dlmmaster.c

Version: ~ [ linux-5.13-rc5 ] ~ [ linux-5.12.9 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.42 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.124 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.193 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.235 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.271 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.271 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.18.140 ] ~ [ linux-3.16.85 ] ~ [ linux-3.14.79 ] ~ [ linux-3.12.74 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 /* -*- mode: c; c-basic-offset: 8; -*-
  2  * vim: noexpandtab sw=8 ts=8 sts=0:
  3  *
  4  * dlmmod.c
  5  *
  6  * standalone DLM module
  7  *
  8  * Copyright (C) 2004 Oracle.  All rights reserved.
  9  *
 10  * This program is free software; you can redistribute it and/or
 11  * modify it under the terms of the GNU General Public
 12  * License as published by the Free Software Foundation; either
 13  * version 2 of the License, or (at your option) any later version.
 14  *
 15  * This program is distributed in the hope that it will be useful,
 16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 18  * General Public License for more details.
 19  *
 20  * You should have received a copy of the GNU General Public
 21  * License along with this program; if not, write to the
 22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 23  * Boston, MA 021110-1307, USA.
 24  *
 25  */
 26 
 27 
 28 #include <linux/module.h>
 29 #include <linux/fs.h>
 30 #include <linux/types.h>
 31 #include <linux/slab.h>
 32 #include <linux/highmem.h>
 33 #include <linux/init.h>
 34 #include <linux/sysctl.h>
 35 #include <linux/random.h>
 36 #include <linux/blkdev.h>
 37 #include <linux/socket.h>
 38 #include <linux/inet.h>
 39 #include <linux/spinlock.h>
 40 #include <linux/delay.h>
 41 
 42 
 43 #include "cluster/heartbeat.h"
 44 #include "cluster/nodemanager.h"
 45 #include "cluster/tcp.h"
 46 
 47 #include "dlmapi.h"
 48 #include "dlmcommon.h"
 49 #include "dlmdomain.h"
 50 #include "dlmdebug.h"
 51 
 52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
 53 #include "cluster/masklog.h"
 54 
 55 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
 56                               struct dlm_master_list_entry *mle,
 57                               struct o2nm_node *node,
 58                               int idx);
 59 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 60                             struct dlm_master_list_entry *mle,
 61                             struct o2nm_node *node,
 62                             int idx);
 63 
 64 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
 65 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
 66                                 struct dlm_lock_resource *res,
 67                                 void *nodemap, u32 flags);
 68 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
 69 
 70 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
 71                                 struct dlm_master_list_entry *mle,
 72                                 const char *name,
 73                                 unsigned int namelen)
 74 {
 75         if (dlm != mle->dlm)
 76                 return 0;
 77 
 78         if (namelen != mle->mnamelen ||
 79             memcmp(name, mle->mname, namelen) != 0)
 80                 return 0;
 81 
 82         return 1;
 83 }
 84 
 85 static struct kmem_cache *dlm_lockres_cache;
 86 static struct kmem_cache *dlm_lockname_cache;
 87 static struct kmem_cache *dlm_mle_cache;
 88 
 89 static void dlm_mle_release(struct kref *kref);
 90 static void dlm_init_mle(struct dlm_master_list_entry *mle,
 91                         enum dlm_mle_type type,
 92                         struct dlm_ctxt *dlm,
 93                         struct dlm_lock_resource *res,
 94                         const char *name,
 95                         unsigned int namelen);
 96 static void dlm_put_mle(struct dlm_master_list_entry *mle);
 97 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
 98 static int dlm_find_mle(struct dlm_ctxt *dlm,
 99                         struct dlm_master_list_entry **mle,
100                         char *name, unsigned int namelen);
101 
102 static int dlm_do_master_request(struct dlm_lock_resource *res,
103                                  struct dlm_master_list_entry *mle, int to);
104 
105 
106 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
107                                      struct dlm_lock_resource *res,
108                                      struct dlm_master_list_entry *mle,
109                                      int *blocked);
110 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
111                                     struct dlm_lock_resource *res,
112                                     struct dlm_master_list_entry *mle,
113                                     int blocked);
114 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
115                                  struct dlm_lock_resource *res,
116                                  struct dlm_master_list_entry *mle,
117                                  struct dlm_master_list_entry **oldmle,
118                                  const char *name, unsigned int namelen,
119                                  u8 new_master, u8 master);
120 
121 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
122                                     struct dlm_lock_resource *res);
123 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
124                                       struct dlm_lock_resource *res);
125 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
126                                        struct dlm_lock_resource *res,
127                                        u8 target);
128 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
129                                        struct dlm_lock_resource *res);
130 
131 
132 int dlm_is_host_down(int errno)
133 {
134         switch (errno) {
135                 case -EBADF:
136                 case -ECONNREFUSED:
137                 case -ENOTCONN:
138                 case -ECONNRESET:
139                 case -EPIPE:
140                 case -EHOSTDOWN:
141                 case -EHOSTUNREACH:
142                 case -ETIMEDOUT:
143                 case -ECONNABORTED:
144                 case -ENETDOWN:
145                 case -ENETUNREACH:
146                 case -ENETRESET:
147                 case -ESHUTDOWN:
148                 case -ENOPROTOOPT:
149                 case -EINVAL:   /* if returned from our tcp code,
150                                    this means there is no socket */
151                         return 1;
152         }
153         return 0;
154 }
155 
156 
157 /*
158  * MASTER LIST FUNCTIONS
159  */
160 
161 
162 /*
163  * regarding master list entries and heartbeat callbacks:
164  *
165  * in order to avoid sleeping and allocation that occurs in
166  * heartbeat, master list entries are simply attached to the
167  * dlm's established heartbeat callbacks.  the mle is attached
168  * when it is created, and since the dlm->spinlock is held at
169  * that time, any heartbeat event will be properly discovered
170  * by the mle.  the mle needs to be detached from the
171  * dlm->mle_hb_events list as soon as heartbeat events are no
172  * longer useful to the mle, and before the mle is freed.
173  *
174  * as a general rule, heartbeat events are no longer needed by
175  * the mle once an "answer" regarding the lock master has been
176  * received.
177  */
178 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
179                                               struct dlm_master_list_entry *mle)
180 {
181         assert_spin_locked(&dlm->spinlock);
182 
183         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
184 }
185 
186 
187 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
188                                               struct dlm_master_list_entry *mle)
189 {
190         if (!list_empty(&mle->hb_events))
191                 list_del_init(&mle->hb_events);
192 }
193 
194 
195 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
196                                             struct dlm_master_list_entry *mle)
197 {
198         spin_lock(&dlm->spinlock);
199         __dlm_mle_detach_hb_events(dlm, mle);
200         spin_unlock(&dlm->spinlock);
201 }
202 
203 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
204 {
205         struct dlm_ctxt *dlm;
206         dlm = mle->dlm;
207 
208         assert_spin_locked(&dlm->spinlock);
209         assert_spin_locked(&dlm->master_lock);
210         mle->inuse++;
211         kref_get(&mle->mle_refs);
212 }
213 
214 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
215 {
216         struct dlm_ctxt *dlm;
217         dlm = mle->dlm;
218 
219         spin_lock(&dlm->spinlock);
220         spin_lock(&dlm->master_lock);
221         mle->inuse--;
222         __dlm_put_mle(mle);
223         spin_unlock(&dlm->master_lock);
224         spin_unlock(&dlm->spinlock);
225 
226 }
227 
228 /* remove from list and free */
229 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
230 {
231         struct dlm_ctxt *dlm;
232         dlm = mle->dlm;
233 
234         assert_spin_locked(&dlm->spinlock);
235         assert_spin_locked(&dlm->master_lock);
236         if (!kref_read(&mle->mle_refs)) {
237                 /* this may or may not crash, but who cares.
238                  * it's a BUG. */
239                 mlog(ML_ERROR, "bad mle: %p\n", mle);
240                 dlm_print_one_mle(mle);
241                 BUG();
242         } else
243                 kref_put(&mle->mle_refs, dlm_mle_release);
244 }
245 
246 
247 /* must not have any spinlocks coming in */
248 static void dlm_put_mle(struct dlm_master_list_entry *mle)
249 {
250         struct dlm_ctxt *dlm;
251         dlm = mle->dlm;
252 
253         spin_lock(&dlm->spinlock);
254         spin_lock(&dlm->master_lock);
255         __dlm_put_mle(mle);
256         spin_unlock(&dlm->master_lock);
257         spin_unlock(&dlm->spinlock);
258 }
259 
260 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
261 {
262         kref_get(&mle->mle_refs);
263 }
264 
265 static void dlm_init_mle(struct dlm_master_list_entry *mle,
266                         enum dlm_mle_type type,
267                         struct dlm_ctxt *dlm,
268                         struct dlm_lock_resource *res,
269                         const char *name,
270                         unsigned int namelen)
271 {
272         assert_spin_locked(&dlm->spinlock);
273 
274         mle->dlm = dlm;
275         mle->type = type;
276         INIT_HLIST_NODE(&mle->master_hash_node);
277         INIT_LIST_HEAD(&mle->hb_events);
278         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
279         spin_lock_init(&mle->spinlock);
280         init_waitqueue_head(&mle->wq);
281         atomic_set(&mle->woken, 0);
282         kref_init(&mle->mle_refs);
283         memset(mle->response_map, 0, sizeof(mle->response_map));
284         mle->master = O2NM_MAX_NODES;
285         mle->new_master = O2NM_MAX_NODES;
286         mle->inuse = 0;
287 
288         BUG_ON(mle->type != DLM_MLE_BLOCK &&
289                mle->type != DLM_MLE_MASTER &&
290                mle->type != DLM_MLE_MIGRATION);
291 
292         if (mle->type == DLM_MLE_MASTER) {
293                 BUG_ON(!res);
294                 mle->mleres = res;
295                 memcpy(mle->mname, res->lockname.name, res->lockname.len);
296                 mle->mnamelen = res->lockname.len;
297                 mle->mnamehash = res->lockname.hash;
298         } else {
299                 BUG_ON(!name);
300                 mle->mleres = NULL;
301                 memcpy(mle->mname, name, namelen);
302                 mle->mnamelen = namelen;
303                 mle->mnamehash = dlm_lockid_hash(name, namelen);
304         }
305 
306         atomic_inc(&dlm->mle_tot_count[mle->type]);
307         atomic_inc(&dlm->mle_cur_count[mle->type]);
308 
309         /* copy off the node_map and register hb callbacks on our copy */
310         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
311         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
312         clear_bit(dlm->node_num, mle->vote_map);
313         clear_bit(dlm->node_num, mle->node_map);
314 
315         /* attach the mle to the domain node up/down events */
316         __dlm_mle_attach_hb_events(dlm, mle);
317 }
318 
319 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
320 {
321         assert_spin_locked(&dlm->spinlock);
322         assert_spin_locked(&dlm->master_lock);
323 
324         if (!hlist_unhashed(&mle->master_hash_node))
325                 hlist_del_init(&mle->master_hash_node);
326 }
327 
328 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
329 {
330         struct hlist_head *bucket;
331 
332         assert_spin_locked(&dlm->master_lock);
333 
334         bucket = dlm_master_hash(dlm, mle->mnamehash);
335         hlist_add_head(&mle->master_hash_node, bucket);
336 }
337 
338 /* returns 1 if found, 0 if not */
339 static int dlm_find_mle(struct dlm_ctxt *dlm,
340                         struct dlm_master_list_entry **mle,
341                         char *name, unsigned int namelen)
342 {
343         struct dlm_master_list_entry *tmpmle;
344         struct hlist_head *bucket;
345         unsigned int hash;
346 
347         assert_spin_locked(&dlm->master_lock);
348 
349         hash = dlm_lockid_hash(name, namelen);
350         bucket = dlm_master_hash(dlm, hash);
351         hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
352                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
353                         continue;
354                 dlm_get_mle(tmpmle);
355                 *mle = tmpmle;
356                 return 1;
357         }
358         return 0;
359 }
360 
361 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
362 {
363         struct dlm_master_list_entry *mle;
364 
365         assert_spin_locked(&dlm->spinlock);
366 
367         list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
368                 if (node_up)
369                         dlm_mle_node_up(dlm, mle, NULL, idx);
370                 else
371                         dlm_mle_node_down(dlm, mle, NULL, idx);
372         }
373 }
374 
375 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
376                               struct dlm_master_list_entry *mle,
377                               struct o2nm_node *node, int idx)
378 {
379         spin_lock(&mle->spinlock);
380 
381         if (!test_bit(idx, mle->node_map))
382                 mlog(0, "node %u already removed from nodemap!\n", idx);
383         else
384                 clear_bit(idx, mle->node_map);
385 
386         spin_unlock(&mle->spinlock);
387 }
388 
389 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
390                             struct dlm_master_list_entry *mle,
391                             struct o2nm_node *node, int idx)
392 {
393         spin_lock(&mle->spinlock);
394 
395         if (test_bit(idx, mle->node_map))
396                 mlog(0, "node %u already in node map!\n", idx);
397         else
398                 set_bit(idx, mle->node_map);
399 
400         spin_unlock(&mle->spinlock);
401 }
402 
403 
404 int dlm_init_mle_cache(void)
405 {
406         dlm_mle_cache = kmem_cache_create("o2dlm_mle",
407                                           sizeof(struct dlm_master_list_entry),
408                                           0, SLAB_HWCACHE_ALIGN,
409                                           NULL);
410         if (dlm_mle_cache == NULL)
411                 return -ENOMEM;
412         return 0;
413 }
414 
415 void dlm_destroy_mle_cache(void)
416 {
417         kmem_cache_destroy(dlm_mle_cache);
418 }
419 
420 static void dlm_mle_release(struct kref *kref)
421 {
422         struct dlm_master_list_entry *mle;
423         struct dlm_ctxt *dlm;
424 
425         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
426         dlm = mle->dlm;
427 
428         assert_spin_locked(&dlm->spinlock);
429         assert_spin_locked(&dlm->master_lock);
430 
431         mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
432              mle->type);
433 
434         /* remove from list if not already */
435         __dlm_unlink_mle(dlm, mle);
436 
437         /* detach the mle from the domain node up/down events */
438         __dlm_mle_detach_hb_events(dlm, mle);
439 
440         atomic_dec(&dlm->mle_cur_count[mle->type]);
441 
442         /* NOTE: kfree under spinlock here.
443          * if this is bad, we can move this to a freelist. */
444         kmem_cache_free(dlm_mle_cache, mle);
445 }
446 
447 
448 /*
449  * LOCK RESOURCE FUNCTIONS
450  */
451 
452 int dlm_init_master_caches(void)
453 {
454         dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
455                                               sizeof(struct dlm_lock_resource),
456                                               0, SLAB_HWCACHE_ALIGN, NULL);
457         if (!dlm_lockres_cache)
458                 goto bail;
459 
460         dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
461                                                DLM_LOCKID_NAME_MAX, 0,
462                                                SLAB_HWCACHE_ALIGN, NULL);
463         if (!dlm_lockname_cache)
464                 goto bail;
465 
466         return 0;
467 bail:
468         dlm_destroy_master_caches();
469         return -ENOMEM;
470 }
471 
472 void dlm_destroy_master_caches(void)
473 {
474         kmem_cache_destroy(dlm_lockname_cache);
475         dlm_lockname_cache = NULL;
476 
477         kmem_cache_destroy(dlm_lockres_cache);
478         dlm_lockres_cache = NULL;
479 }
480 
481 static void dlm_lockres_release(struct kref *kref)
482 {
483         struct dlm_lock_resource *res;
484         struct dlm_ctxt *dlm;
485 
486         res = container_of(kref, struct dlm_lock_resource, refs);
487         dlm = res->dlm;
488 
489         /* This should not happen -- all lockres' have a name
490          * associated with them at init time. */
491         BUG_ON(!res->lockname.name);
492 
493         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
494              res->lockname.name);
495 
496         atomic_dec(&dlm->res_cur_count);
497 
498         if (!hlist_unhashed(&res->hash_node) ||
499             !list_empty(&res->granted) ||
500             !list_empty(&res->converting) ||
501             !list_empty(&res->blocked) ||
502             !list_empty(&res->dirty) ||
503             !list_empty(&res->recovering) ||
504             !list_empty(&res->purge)) {
505                 mlog(ML_ERROR,
506                      "Going to BUG for resource %.*s."
507                      "  We're on a list! [%c%c%c%c%c%c%c]\n",
508                      res->lockname.len, res->lockname.name,
509                      !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
510                      !list_empty(&res->granted) ? 'G' : ' ',
511                      !list_empty(&res->converting) ? 'C' : ' ',
512                      !list_empty(&res->blocked) ? 'B' : ' ',
513                      !list_empty(&res->dirty) ? 'D' : ' ',
514                      !list_empty(&res->recovering) ? 'R' : ' ',
515                      !list_empty(&res->purge) ? 'P' : ' ');
516 
517                 dlm_print_one_lock_resource(res);
518         }
519 
520         /* By the time we're ready to blow this guy away, we shouldn't
521          * be on any lists. */
522         BUG_ON(!hlist_unhashed(&res->hash_node));
523         BUG_ON(!list_empty(&res->granted));
524         BUG_ON(!list_empty(&res->converting));
525         BUG_ON(!list_empty(&res->blocked));
526         BUG_ON(!list_empty(&res->dirty));
527         BUG_ON(!list_empty(&res->recovering));
528         BUG_ON(!list_empty(&res->purge));
529 
530         kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
531 
532         kmem_cache_free(dlm_lockres_cache, res);
533 }
534 
535 void dlm_lockres_put(struct dlm_lock_resource *res)
536 {
537         kref_put(&res->refs, dlm_lockres_release);
538 }
539 
540 static void dlm_init_lockres(struct dlm_ctxt *dlm,
541                              struct dlm_lock_resource *res,
542                              const char *name, unsigned int namelen)
543 {
544         char *qname;
545 
546         /* If we memset here, we lose our reference to the kmalloc'd
547          * res->lockname.name, so be sure to init every field
548          * correctly! */
549 
550         qname = (char *) res->lockname.name;
551         memcpy(qname, name, namelen);
552 
553         res->lockname.len = namelen;
554         res->lockname.hash = dlm_lockid_hash(name, namelen);
555 
556         init_waitqueue_head(&res->wq);
557         spin_lock_init(&res->spinlock);
558         INIT_HLIST_NODE(&res->hash_node);
559         INIT_LIST_HEAD(&res->granted);
560         INIT_LIST_HEAD(&res->converting);
561         INIT_LIST_HEAD(&res->blocked);
562         INIT_LIST_HEAD(&res->dirty);
563         INIT_LIST_HEAD(&res->recovering);
564         INIT_LIST_HEAD(&res->purge);
565         INIT_LIST_HEAD(&res->tracking);
566         atomic_set(&res->asts_reserved, 0);
567         res->migration_pending = 0;
568         res->inflight_locks = 0;
569         res->inflight_assert_workers = 0;
570 
571         res->dlm = dlm;
572 
573         kref_init(&res->refs);
574 
575         atomic_inc(&dlm->res_tot_count);
576         atomic_inc(&dlm->res_cur_count);
577 
578         /* just for consistency */
579         spin_lock(&res->spinlock);
580         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
581         spin_unlock(&res->spinlock);
582 
583         res->state = DLM_LOCK_RES_IN_PROGRESS;
584 
585         res->last_used = 0;
586 
587         spin_lock(&dlm->spinlock);
588         list_add_tail(&res->tracking, &dlm->tracking_list);
589         spin_unlock(&dlm->spinlock);
590 
591         memset(res->lvb, 0, DLM_LVB_LEN);
592         memset(res->refmap, 0, sizeof(res->refmap));
593 }
594 
595 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
596                                    const char *name,
597                                    unsigned int namelen)
598 {
599         struct dlm_lock_resource *res = NULL;
600 
601         res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
602         if (!res)
603                 goto error;
604 
605         res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
606         if (!res->lockname.name)
607                 goto error;
608 
609         dlm_init_lockres(dlm, res, name, namelen);
610         return res;
611 
612 error:
613         if (res)
614                 kmem_cache_free(dlm_lockres_cache, res);
615         return NULL;
616 }
617 
618 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
619                                 struct dlm_lock_resource *res, int bit)
620 {
621         assert_spin_locked(&res->spinlock);
622 
623         mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
624              res->lockname.name, bit, __builtin_return_address(0));
625 
626         set_bit(bit, res->refmap);
627 }
628 
629 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
630                                   struct dlm_lock_resource *res, int bit)
631 {
632         assert_spin_locked(&res->spinlock);
633 
634         mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
635              res->lockname.name, bit, __builtin_return_address(0));
636 
637         clear_bit(bit, res->refmap);
638 }
639 
640 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
641                                    struct dlm_lock_resource *res)
642 {
643         res->inflight_locks++;
644 
645         mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
646              res->lockname.len, res->lockname.name, res->inflight_locks,
647              __builtin_return_address(0));
648 }
649 
650 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
651                                    struct dlm_lock_resource *res)
652 {
653         assert_spin_locked(&res->spinlock);
654         __dlm_lockres_grab_inflight_ref(dlm, res);
655 }
656 
657 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
658                                    struct dlm_lock_resource *res)
659 {
660         assert_spin_locked(&res->spinlock);
661 
662         BUG_ON(res->inflight_locks == 0);
663 
664         res->inflight_locks--;
665 
666         mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
667              res->lockname.len, res->lockname.name, res->inflight_locks,
668              __builtin_return_address(0));
669 
670         wake_up(&res->wq);
671 }
672 
673 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
674                 struct dlm_lock_resource *res)
675 {
676         assert_spin_locked(&res->spinlock);
677         res->inflight_assert_workers++;
678         mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
679                         dlm->name, res->lockname.len, res->lockname.name,
680                         res->inflight_assert_workers);
681 }
682 
683 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
684                 struct dlm_lock_resource *res)
685 {
686         assert_spin_locked(&res->spinlock);
687         BUG_ON(res->inflight_assert_workers == 0);
688         res->inflight_assert_workers--;
689         mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
690                         dlm->name, res->lockname.len, res->lockname.name,
691                         res->inflight_assert_workers);
692 }
693 
694 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
695                 struct dlm_lock_resource *res)
696 {
697         spin_lock(&res->spinlock);
698         __dlm_lockres_drop_inflight_worker(dlm, res);
699         spin_unlock(&res->spinlock);
700 }
701 
702 /*
703  * lookup a lock resource by name.
704  * may already exist in the hashtable.
705  * lockid is null terminated
706  *
707  * if not, allocate enough for the lockres and for
708  * the temporary structure used in doing the mastering.
709  *
710  * also, do a lookup in the dlm->master_list to see
711  * if another node has begun mastering the same lock.
712  * if so, there should be a block entry in there
713  * for this name, and we should *not* attempt to master
714  * the lock here.   need to wait around for that node
715  * to assert_master (or die).
716  *
717  */
718 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
719                                           const char *lockid,
720                                           int namelen,
721                                           int flags)
722 {
723         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
724         struct dlm_master_list_entry *mle = NULL;
725         struct dlm_master_list_entry *alloc_mle = NULL;
726         int blocked = 0;
727         int ret, nodenum;
728         struct dlm_node_iter iter;
729         unsigned int hash;
730         int tries = 0;
731         int bit, wait_on_recovery = 0;
732 
733         BUG_ON(!lockid);
734 
735         hash = dlm_lockid_hash(lockid, namelen);
736 
737         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
738 
739 lookup:
740         spin_lock(&dlm->spinlock);
741         tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
742         if (tmpres) {
743                 spin_unlock(&dlm->spinlock);
744                 spin_lock(&tmpres->spinlock);
745 
746                 /*
747                  * Right after dlm spinlock was released, dlm_thread could have
748                  * purged the lockres. Check if lockres got unhashed. If so
749                  * start over.
750                  */
751                 if (hlist_unhashed(&tmpres->hash_node)) {
752                         spin_unlock(&tmpres->spinlock);
753                         dlm_lockres_put(tmpres);
754                         tmpres = NULL;
755                         goto lookup;
756                 }
757 
758                 /* Wait on the thread that is mastering the resource */
759                 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
760                         __dlm_wait_on_lockres(tmpres);
761                         BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
762                         spin_unlock(&tmpres->spinlock);
763                         dlm_lockres_put(tmpres);
764                         tmpres = NULL;
765                         goto lookup;
766                 }
767 
768                 /* Wait on the resource purge to complete before continuing */
769                 if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
770                         BUG_ON(tmpres->owner == dlm->node_num);
771                         __dlm_wait_on_lockres_flags(tmpres,
772                                                     DLM_LOCK_RES_DROPPING_REF);
773                         spin_unlock(&tmpres->spinlock);
774                         dlm_lockres_put(tmpres);
775                         tmpres = NULL;
776                         goto lookup;
777                 }
778 
779                 /* Grab inflight ref to pin the resource */
780                 dlm_lockres_grab_inflight_ref(dlm, tmpres);
781 
782                 spin_unlock(&tmpres->spinlock);
783                 if (res) {
784                         spin_lock(&dlm->track_lock);
785                         if (!list_empty(&res->tracking))
786                                 list_del_init(&res->tracking);
787                         else
788                                 mlog(ML_ERROR, "Resource %.*s not "
789                                                 "on the Tracking list\n",
790                                                 res->lockname.len,
791                                                 res->lockname.name);
792                         spin_unlock(&dlm->track_lock);
793                         dlm_lockres_put(res);
794                 }
795                 res = tmpres;
796                 goto leave;
797         }
798 
799         if (!res) {
800                 spin_unlock(&dlm->spinlock);
801                 mlog(0, "allocating a new resource\n");
802                 /* nothing found and we need to allocate one. */
803                 alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
804                 if (!alloc_mle)
805                         goto leave;
806                 res = dlm_new_lockres(dlm, lockid, namelen);
807                 if (!res)
808                         goto leave;
809                 goto lookup;
810         }
811 
812         mlog(0, "no lockres found, allocated our own: %p\n", res);
813 
814         if (flags & LKM_LOCAL) {
815                 /* caller knows it's safe to assume it's not mastered elsewhere
816                  * DONE!  return right away */
817                 spin_lock(&res->spinlock);
818                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
819                 __dlm_insert_lockres(dlm, res);
820                 dlm_lockres_grab_inflight_ref(dlm, res);
821                 spin_unlock(&res->spinlock);
822                 spin_unlock(&dlm->spinlock);
823                 /* lockres still marked IN_PROGRESS */
824                 goto wake_waiters;
825         }
826 
827         /* check master list to see if another node has started mastering it */
828         spin_lock(&dlm->master_lock);
829 
830         /* if we found a block, wait for lock to be mastered by another node */
831         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
832         if (blocked) {
833                 int mig;
834                 if (mle->type == DLM_MLE_MASTER) {
835                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
836                         BUG();
837                 }
838                 mig = (mle->type == DLM_MLE_MIGRATION);
839                 /* if there is a migration in progress, let the migration
840                  * finish before continuing.  we can wait for the absence
841                  * of the MIGRATION mle: either the migrate finished or
842                  * one of the nodes died and the mle was cleaned up.
843                  * if there is a BLOCK here, but it already has a master
844                  * set, we are too late.  the master does not have a ref
845                  * for us in the refmap.  detach the mle and drop it.
846                  * either way, go back to the top and start over. */
847                 if (mig || mle->master != O2NM_MAX_NODES) {
848                         BUG_ON(mig && mle->master == dlm->node_num);
849                         /* we arrived too late.  the master does not
850                          * have a ref for us. retry. */
851                         mlog(0, "%s:%.*s: late on %s\n",
852                              dlm->name, namelen, lockid,
853                              mig ?  "MIGRATION" : "BLOCK");
854                         spin_unlock(&dlm->master_lock);
855                         spin_unlock(&dlm->spinlock);
856 
857                         /* master is known, detach */
858                         if (!mig)
859                                 dlm_mle_detach_hb_events(dlm, mle);
860                         dlm_put_mle(mle);
861                         mle = NULL;
862                         /* this is lame, but we can't wait on either
863                          * the mle or lockres waitqueue here */
864                         if (mig)
865                                 msleep(100);
866                         goto lookup;
867                 }
868         } else {
869                 /* go ahead and try to master lock on this node */
870                 mle = alloc_mle;
871                 /* make sure this does not get freed below */
872                 alloc_mle = NULL;
873                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
874                 set_bit(dlm->node_num, mle->maybe_map);
875                 __dlm_insert_mle(dlm, mle);
876 
877                 /* still holding the dlm spinlock, check the recovery map
878                  * to see if there are any nodes that still need to be
879                  * considered.  these will not appear in the mle nodemap
880                  * but they might own this lockres.  wait on them. */
881                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
882                 if (bit < O2NM_MAX_NODES) {
883                         mlog(0, "%s: res %.*s, At least one node (%d) "
884                              "to recover before lock mastery can begin\n",
885                              dlm->name, namelen, (char *)lockid, bit);
886                         wait_on_recovery = 1;
887                 }
888         }
889 
890         /* at this point there is either a DLM_MLE_BLOCK or a
891          * DLM_MLE_MASTER on the master list, so it's safe to add the
892          * lockres to the hashtable.  anyone who finds the lock will
893          * still have to wait on the IN_PROGRESS. */
894 
895         /* finally add the lockres to its hash bucket */
896         __dlm_insert_lockres(dlm, res);
897 
898         /* since this lockres is new it doesn't not require the spinlock */
899         __dlm_lockres_grab_inflight_ref(dlm, res);
900 
901         /* get an extra ref on the mle in case this is a BLOCK
902          * if so, the creator of the BLOCK may try to put the last
903          * ref at this time in the assert master handler, so we
904          * need an extra one to keep from a bad ptr deref. */
905         dlm_get_mle_inuse(mle);
906         spin_unlock(&dlm->master_lock);
907         spin_unlock(&dlm->spinlock);
908 
909 redo_request:
910         while (wait_on_recovery) {
911                 /* any cluster changes that occurred after dropping the
912                  * dlm spinlock would be detectable be a change on the mle,
913                  * so we only need to clear out the recovery map once. */
914                 if (dlm_is_recovery_lock(lockid, namelen)) {
915                         mlog(0, "%s: Recovery map is not empty, but must "
916                              "master $RECOVERY lock now\n", dlm->name);
917                         if (!dlm_pre_master_reco_lockres(dlm, res))
918                                 wait_on_recovery = 0;
919                         else {
920                                 mlog(0, "%s: waiting 500ms for heartbeat state "
921                                     "change\n", dlm->name);
922                                 msleep(500);
923                         }
924                         continue;
925                 }
926 
927                 dlm_kick_recovery_thread(dlm);
928                 msleep(1000);
929                 dlm_wait_for_recovery(dlm);
930 
931                 spin_lock(&dlm->spinlock);
932                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
933                 if (bit < O2NM_MAX_NODES) {
934                         mlog(0, "%s: res %.*s, At least one node (%d) "
935                              "to recover before lock mastery can begin\n",
936                              dlm->name, namelen, (char *)lockid, bit);
937                         wait_on_recovery = 1;
938                 } else
939                         wait_on_recovery = 0;
940                 spin_unlock(&dlm->spinlock);
941 
942                 if (wait_on_recovery)
943                         dlm_wait_for_node_recovery(dlm, bit, 10000);
944         }
945 
946         /* must wait for lock to be mastered elsewhere */
947         if (blocked)
948                 goto wait;
949 
950         ret = -EINVAL;
951         dlm_node_iter_init(mle->vote_map, &iter);
952         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
953                 ret = dlm_do_master_request(res, mle, nodenum);
954                 if (ret < 0)
955                         mlog_errno(ret);
956                 if (mle->master != O2NM_MAX_NODES) {
957                         /* found a master ! */
958                         if (mle->master <= nodenum)
959                                 break;
960                         /* if our master request has not reached the master
961                          * yet, keep going until it does.  this is how the
962                          * master will know that asserts are needed back to
963                          * the lower nodes. */
964                         mlog(0, "%s: res %.*s, Requests only up to %u but "
965                              "master is %u, keep going\n", dlm->name, namelen,
966                              lockid, nodenum, mle->master);
967                 }
968         }
969 
970 wait:
971         /* keep going until the response map includes all nodes */
972         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
973         if (ret < 0) {
974                 wait_on_recovery = 1;
975                 mlog(0, "%s: res %.*s, Node map changed, redo the master "
976                      "request now, blocked=%d\n", dlm->name, res->lockname.len,
977                      res->lockname.name, blocked);
978                 if (++tries > 20) {
979                         mlog(ML_ERROR, "%s: res %.*s, Spinning on "
980                              "dlm_wait_for_lock_mastery, blocked = %d\n",
981                              dlm->name, res->lockname.len,
982                              res->lockname.name, blocked);
983                         dlm_print_one_lock_resource(res);
984                         dlm_print_one_mle(mle);
985                         tries = 0;
986                 }
987                 goto redo_request;
988         }
989 
990         mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
991              res->lockname.name, res->owner);
992         /* make sure we never continue without this */
993         BUG_ON(res->owner == O2NM_MAX_NODES);
994 
995         /* master is known, detach if not already detached */
996         dlm_mle_detach_hb_events(dlm, mle);
997         dlm_put_mle(mle);
998         /* put the extra ref */
999         dlm_put_mle_inuse(mle);
1000 
1001 wake_waiters:
1002         spin_lock(&res->spinlock);
1003         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1004         spin_unlock(&res->spinlock);
1005         wake_up(&res->wq);
1006 
1007 leave:
1008         /* need to free the unused mle */
1009         if (alloc_mle)
1010                 kmem_cache_free(dlm_mle_cache, alloc_mle);
1011 
1012         return res;
1013 }
1014 
1015 
1016 #define DLM_MASTERY_TIMEOUT_MS   5000
1017 
1018 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1019                                      struct dlm_lock_resource *res,
1020                                      struct dlm_master_list_entry *mle,
1021                                      int *blocked)
1022 {
1023         u8 m;
1024         int ret, bit;
1025         int map_changed, voting_done;
1026         int assert, sleep;
1027 
1028 recheck:
1029         ret = 0;
1030         assert = 0;
1031 
1032         /* check if another node has already become the owner */
1033         spin_lock(&res->spinlock);
1034         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1035                 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1036                      res->lockname.len, res->lockname.name, res->owner);
1037                 spin_unlock(&res->spinlock);
1038                 /* this will cause the master to re-assert across
1039                  * the whole cluster, freeing up mles */
1040                 if (res->owner != dlm->node_num) {
1041                         ret = dlm_do_master_request(res, mle, res->owner);
1042                         if (ret < 0) {
1043                                 /* give recovery a chance to run */
1044                                 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1045                                 msleep(500);
1046                                 goto recheck;
1047                         }
1048                 }
1049                 ret = 0;
1050                 goto leave;
1051         }
1052         spin_unlock(&res->spinlock);
1053 
1054         spin_lock(&mle->spinlock);
1055         m = mle->master;
1056         map_changed = (memcmp(mle->vote_map, mle->node_map,
1057                               sizeof(mle->vote_map)) != 0);
1058         voting_done = (memcmp(mle->vote_map, mle->response_map,
1059                              sizeof(mle->vote_map)) == 0);
1060 
1061         /* restart if we hit any errors */
1062         if (map_changed) {
1063                 int b;
1064                 mlog(0, "%s: %.*s: node map changed, restarting\n",
1065                      dlm->name, res->lockname.len, res->lockname.name);
1066                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1067                 b = (mle->type == DLM_MLE_BLOCK);
1068                 if ((*blocked && !b) || (!*blocked && b)) {
1069                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1070                              dlm->name, res->lockname.len, res->lockname.name,
1071                              *blocked, b);
1072                         *blocked = b;
1073                 }
1074                 spin_unlock(&mle->spinlock);
1075                 if (ret < 0) {
1076                         mlog_errno(ret);
1077                         goto leave;
1078                 }
1079                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1080                      "rechecking now\n", dlm->name, res->lockname.len,
1081                      res->lockname.name);
1082                 goto recheck;
1083         } else {
1084                 if (!voting_done) {
1085                         mlog(0, "map not changed and voting not done "
1086                              "for %s:%.*s\n", dlm->name, res->lockname.len,
1087                              res->lockname.name);
1088                 }
1089         }
1090 
1091         if (m != O2NM_MAX_NODES) {
1092                 /* another node has done an assert!
1093                  * all done! */
1094                 sleep = 0;
1095         } else {
1096                 sleep = 1;
1097                 /* have all nodes responded? */
1098                 if (voting_done && !*blocked) {
1099                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1100                         if (dlm->node_num <= bit) {
1101                                 /* my node number is lowest.
1102                                  * now tell other nodes that I am
1103                                  * mastering this. */
1104                                 mle->master = dlm->node_num;
1105                                 /* ref was grabbed in get_lock_resource
1106                                  * will be dropped in dlmlock_master */
1107                                 assert = 1;
1108                                 sleep = 0;
1109                         }
1110                         /* if voting is done, but we have not received
1111                          * an assert master yet, we must sleep */
1112                 }
1113         }
1114 
1115         spin_unlock(&mle->spinlock);
1116 
1117         /* sleep if we haven't finished voting yet */
1118         if (sleep) {
1119                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1120                 atomic_set(&mle->woken, 0);
1121                 (void)wait_event_timeout(mle->wq,
1122                                          (atomic_read(&mle->woken) == 1),
1123                                          timeo);
1124                 if (res->owner == O2NM_MAX_NODES) {
1125                         mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1126                              res->lockname.len, res->lockname.name);
1127                         goto recheck;
1128                 }
1129                 mlog(0, "done waiting, master is %u\n", res->owner);
1130                 ret = 0;
1131                 goto leave;
1132         }
1133 
1134         ret = 0;   /* done */
1135         if (assert) {
1136                 m = dlm->node_num;
1137                 mlog(0, "about to master %.*s here, this=%u\n",
1138                      res->lockname.len, res->lockname.name, m);
1139                 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1140                 if (ret) {
1141                         /* This is a failure in the network path,
1142                          * not in the response to the assert_master
1143                          * (any nonzero response is a BUG on this node).
1144                          * Most likely a socket just got disconnected
1145                          * due to node death. */
1146                         mlog_errno(ret);
1147                 }
1148                 /* no longer need to restart lock mastery.
1149                  * all living nodes have been contacted. */
1150                 ret = 0;
1151         }
1152 
1153         /* set the lockres owner */
1154         spin_lock(&res->spinlock);
1155         /* mastery reference obtained either during
1156          * assert_master_handler or in get_lock_resource */
1157         dlm_change_lockres_owner(dlm, res, m);
1158         spin_unlock(&res->spinlock);
1159 
1160 leave:
1161         return ret;
1162 }
1163 
1164 struct dlm_bitmap_diff_iter
1165 {
1166         int curnode;
1167         unsigned long *orig_bm;
1168         unsigned long *cur_bm;
1169         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1170 };
1171 
1172 enum dlm_node_state_change
1173 {
1174         NODE_DOWN = -1,
1175         NODE_NO_CHANGE = 0,
1176         NODE_UP
1177 };
1178 
1179 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1180                                       unsigned long *orig_bm,
1181                                       unsigned long *cur_bm)
1182 {
1183         unsigned long p1, p2;
1184         int i;
1185 
1186         iter->curnode = -1;
1187         iter->orig_bm = orig_bm;
1188         iter->cur_bm = cur_bm;
1189 
1190         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1191                 p1 = *(iter->orig_bm + i);
1192                 p2 = *(iter->cur_bm + i);
1193                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1194         }
1195 }
1196 
1197 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1198                                      enum dlm_node_state_change *state)
1199 {
1200         int bit;
1201 
1202         if (iter->curnode >= O2NM_MAX_NODES)
1203                 return -ENOENT;
1204 
1205         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1206                             iter->curnode+1);
1207         if (bit >= O2NM_MAX_NODES) {
1208                 iter->curnode = O2NM_MAX_NODES;
1209                 return -ENOENT;
1210         }
1211 
1212         /* if it was there in the original then this node died */
1213         if (test_bit(bit, iter->orig_bm))
1214                 *state = NODE_DOWN;
1215         else
1216                 *state = NODE_UP;
1217 
1218         iter->curnode = bit;
1219         return bit;
1220 }
1221 
1222 
1223 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1224                                     struct dlm_lock_resource *res,
1225                                     struct dlm_master_list_entry *mle,
1226                                     int blocked)
1227 {
1228         struct dlm_bitmap_diff_iter bdi;
1229         enum dlm_node_state_change sc;
1230         int node;
1231         int ret = 0;
1232 
1233         mlog(0, "something happened such that the "
1234              "master process may need to be restarted!\n");
1235 
1236         assert_spin_locked(&mle->spinlock);
1237 
1238         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1239         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1240         while (node >= 0) {
1241                 if (sc == NODE_UP) {
1242                         /* a node came up.  clear any old vote from
1243                          * the response map and set it in the vote map
1244                          * then restart the mastery. */
1245                         mlog(ML_NOTICE, "node %d up while restarting\n", node);
1246 
1247                         /* redo the master request, but only for the new node */
1248                         mlog(0, "sending request to new node\n");
1249                         clear_bit(node, mle->response_map);
1250                         set_bit(node, mle->vote_map);
1251                 } else {
1252                         mlog(ML_ERROR, "node down! %d\n", node);
1253                         if (blocked) {
1254                                 int lowest = find_next_bit(mle->maybe_map,
1255                                                        O2NM_MAX_NODES, 0);
1256 
1257                                 /* act like it was never there */
1258                                 clear_bit(node, mle->maybe_map);
1259 
1260                                 if (node == lowest) {
1261                                         mlog(0, "expected master %u died"
1262                                             " while this node was blocked "
1263                                             "waiting on it!\n", node);
1264                                         lowest = find_next_bit(mle->maybe_map,
1265                                                         O2NM_MAX_NODES,
1266                                                         lowest+1);
1267                                         if (lowest < O2NM_MAX_NODES) {
1268                                                 mlog(0, "%s:%.*s:still "
1269                                                      "blocked. waiting on %u "
1270                                                      "now\n", dlm->name,
1271                                                      res->lockname.len,
1272                                                      res->lockname.name,
1273                                                      lowest);
1274                                         } else {
1275                                                 /* mle is an MLE_BLOCK, but
1276                                                  * there is now nothing left to
1277                                                  * block on.  we need to return
1278                                                  * all the way back out and try
1279                                                  * again with an MLE_MASTER.
1280                                                  * dlm_do_local_recovery_cleanup
1281                                                  * has already run, so the mle
1282                                                  * refcount is ok */
1283                                                 mlog(0, "%s:%.*s: no "
1284                                                      "longer blocking. try to "
1285                                                      "master this here\n",
1286                                                      dlm->name,
1287                                                      res->lockname.len,
1288                                                      res->lockname.name);
1289                                                 mle->type = DLM_MLE_MASTER;
1290                                                 mle->mleres = res;
1291                                         }
1292                                 }
1293                         }
1294 
1295                         /* now blank out everything, as if we had never
1296                          * contacted anyone */
1297                         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1298                         memset(mle->response_map, 0, sizeof(mle->response_map));
1299                         /* reset the vote_map to the current node_map */
1300                         memcpy(mle->vote_map, mle->node_map,
1301                                sizeof(mle->node_map));
1302                         /* put myself into the maybe map */
1303                         if (mle->type != DLM_MLE_BLOCK)
1304                                 set_bit(dlm->node_num, mle->maybe_map);
1305                 }
1306                 ret = -EAGAIN;
1307                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1308         }
1309         return ret;
1310 }
1311 
1312 
1313 /*
1314  * DLM_MASTER_REQUEST_MSG
1315  *
1316  * returns: 0 on success,
1317  *          -errno on a network error
1318  *
1319  * on error, the caller should assume the target node is "dead"
1320  *
1321  */
1322 
1323 static int dlm_do_master_request(struct dlm_lock_resource *res,
1324                                  struct dlm_master_list_entry *mle, int to)
1325 {
1326         struct dlm_ctxt *dlm = mle->dlm;
1327         struct dlm_master_request request;
1328         int ret, response=0, resend;
1329 
1330         memset(&request, 0, sizeof(request));
1331         request.node_idx = dlm->node_num;
1332 
1333         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1334 
1335         request.namelen = (u8)mle->mnamelen;
1336         memcpy(request.name, mle->mname, request.namelen);
1337 
1338 again:
1339         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1340                                  sizeof(request), to, &response);
1341         if (ret < 0)  {
1342                 if (ret == -ESRCH) {
1343                         /* should never happen */
1344                         mlog(ML_ERROR, "TCP stack not ready!\n");
1345                         BUG();
1346                 } else if (ret == -EINVAL) {
1347                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1348                         BUG();
1349                 } else if (ret == -ENOMEM) {
1350                         mlog(ML_ERROR, "out of memory while trying to send "
1351                              "network message!  retrying\n");
1352                         /* this is totally crude */
1353                         msleep(50);
1354                         goto again;
1355                 } else if (!dlm_is_host_down(ret)) {
1356                         /* not a network error. bad. */
1357                         mlog_errno(ret);
1358                         mlog(ML_ERROR, "unhandled error!");
1359                         BUG();
1360                 }
1361                 /* all other errors should be network errors,
1362                  * and likely indicate node death */
1363                 mlog(ML_ERROR, "link to %d went down!\n", to);
1364                 goto out;
1365         }
1366 
1367         ret = 0;
1368         resend = 0;
1369         spin_lock(&mle->spinlock);
1370         switch (response) {
1371                 case DLM_MASTER_RESP_YES:
1372                         set_bit(to, mle->response_map);
1373                         mlog(0, "node %u is the master, response=YES\n", to);
1374                         mlog(0, "%s:%.*s: master node %u now knows I have a "
1375                              "reference\n", dlm->name, res->lockname.len,
1376                              res->lockname.name, to);
1377                         mle->master = to;
1378                         break;
1379                 case DLM_MASTER_RESP_NO:
1380                         mlog(0, "node %u not master, response=NO\n", to);
1381                         set_bit(to, mle->response_map);
1382                         break;
1383                 case DLM_MASTER_RESP_MAYBE:
1384                         mlog(0, "node %u not master, response=MAYBE\n", to);
1385                         set_bit(to, mle->response_map);
1386                         set_bit(to, mle->maybe_map);
1387                         break;
1388                 case DLM_MASTER_RESP_ERROR:
1389                         mlog(0, "node %u hit an error, resending\n", to);
1390                         resend = 1;
1391                         response = 0;
1392                         break;
1393                 default:
1394                         mlog(ML_ERROR, "bad response! %u\n", response);
1395                         BUG();
1396         }
1397         spin_unlock(&mle->spinlock);
1398         if (resend) {
1399                 /* this is also totally crude */
1400                 msleep(50);
1401                 goto again;
1402         }
1403 
1404 out:
1405         return ret;
1406 }
1407 
1408 /*
1409  * locks that can be taken here:
1410  * dlm->spinlock
1411  * res->spinlock
1412  * mle->spinlock
1413  * dlm->master_list
1414  *
1415  * if possible, TRIM THIS DOWN!!!
1416  */
1417 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1418                                void **ret_data)
1419 {
1420         u8 response = DLM_MASTER_RESP_MAYBE;
1421         struct dlm_ctxt *dlm = data;
1422         struct dlm_lock_resource *res = NULL;
1423         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1424         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1425         char *name;
1426         unsigned int namelen, hash;
1427         int found, ret;
1428         int set_maybe;
1429         int dispatch_assert = 0;
1430         int dispatched = 0;
1431 
1432         if (!dlm_grab(dlm))
1433                 return DLM_MASTER_RESP_NO;
1434 
1435         if (!dlm_domain_fully_joined(dlm)) {
1436                 response = DLM_MASTER_RESP_NO;
1437                 goto send_response;
1438         }
1439 
1440         name = request->name;
1441         namelen = request->namelen;
1442         hash = dlm_lockid_hash(name, namelen);
1443 
1444         if (namelen > DLM_LOCKID_NAME_MAX) {
1445                 response = DLM_IVBUFLEN;
1446                 goto send_response;
1447         }
1448 
1449 way_up_top:
1450         spin_lock(&dlm->spinlock);
1451         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1452         if (res) {
1453                 spin_unlock(&dlm->spinlock);
1454 
1455                 /* take care of the easy cases up front */
1456                 spin_lock(&res->spinlock);
1457 
1458                 /*
1459                  * Right after dlm spinlock was released, dlm_thread could have
1460                  * purged the lockres. Check if lockres got unhashed. If so
1461                  * start over.
1462                  */
1463                 if (hlist_unhashed(&res->hash_node)) {
1464                         spin_unlock(&res->spinlock);
1465                         dlm_lockres_put(res);
1466                         goto way_up_top;
1467                 }
1468 
1469                 if (res->state & (DLM_LOCK_RES_RECOVERING|
1470                                   DLM_LOCK_RES_MIGRATING)) {
1471                         spin_unlock(&res->spinlock);
1472                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1473                              "being recovered/migrated\n");
1474                         response = DLM_MASTER_RESP_ERROR;
1475                         if (mle)
1476                                 kmem_cache_free(dlm_mle_cache, mle);
1477                         goto send_response;
1478                 }
1479 
1480                 if (res->owner == dlm->node_num) {
1481                         dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1482                         spin_unlock(&res->spinlock);
1483                         response = DLM_MASTER_RESP_YES;
1484                         if (mle)
1485                                 kmem_cache_free(dlm_mle_cache, mle);
1486 
1487                         /* this node is the owner.
1488                          * there is some extra work that needs to
1489                          * happen now.  the requesting node has
1490                          * caused all nodes up to this one to
1491                          * create mles.  this node now needs to
1492                          * go back and clean those up. */
1493                         dispatch_assert = 1;
1494                         goto send_response;
1495                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1496                         spin_unlock(&res->spinlock);
1497                         // mlog(0, "node %u is the master\n", res->owner);
1498                         response = DLM_MASTER_RESP_NO;
1499                         if (mle)
1500                                 kmem_cache_free(dlm_mle_cache, mle);
1501                         goto send_response;
1502                 }
1503 
1504                 /* ok, there is no owner.  either this node is
1505                  * being blocked, or it is actively trying to
1506                  * master this lock. */
1507                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1508                         mlog(ML_ERROR, "lock with no owner should be "
1509                              "in-progress!\n");
1510                         BUG();
1511                 }
1512 
1513                 // mlog(0, "lockres is in progress...\n");
1514                 spin_lock(&dlm->master_lock);
1515                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1516                 if (!found) {
1517                         mlog(ML_ERROR, "no mle found for this lock!\n");
1518                         BUG();
1519                 }
1520                 set_maybe = 1;
1521                 spin_lock(&tmpmle->spinlock);
1522                 if (tmpmle->type == DLM_MLE_BLOCK) {
1523                         // mlog(0, "this node is waiting for "
1524                         // "lockres to be mastered\n");
1525                         response = DLM_MASTER_RESP_NO;
1526                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1527                         mlog(0, "node %u is master, but trying to migrate to "
1528                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1529                         if (tmpmle->master == dlm->node_num) {
1530                                 mlog(ML_ERROR, "no owner on lockres, but this "
1531                                      "node is trying to migrate it to %u?!\n",
1532                                      tmpmle->new_master);
1533                                 BUG();
1534                         } else {
1535                                 /* the real master can respond on its own */
1536                                 response = DLM_MASTER_RESP_NO;
1537                         }
1538                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1539                         set_maybe = 0;
1540                         if (tmpmle->master == dlm->node_num) {
1541                                 response = DLM_MASTER_RESP_YES;
1542                                 /* this node will be the owner.
1543                                  * go back and clean the mles on any
1544                                  * other nodes */
1545                                 dispatch_assert = 1;
1546                                 dlm_lockres_set_refmap_bit(dlm, res,
1547                                                            request->node_idx);
1548                         } else
1549                                 response = DLM_MASTER_RESP_NO;
1550                 } else {
1551                         // mlog(0, "this node is attempting to "
1552                         // "master lockres\n");
1553                         response = DLM_MASTER_RESP_MAYBE;
1554                 }
1555                 if (set_maybe)
1556                         set_bit(request->node_idx, tmpmle->maybe_map);
1557                 spin_unlock(&tmpmle->spinlock);
1558 
1559                 spin_unlock(&dlm->master_lock);
1560                 spin_unlock(&res->spinlock);
1561 
1562                 /* keep the mle attached to heartbeat events */
1563                 dlm_put_mle(tmpmle);
1564                 if (mle)
1565                         kmem_cache_free(dlm_mle_cache, mle);
1566                 goto send_response;
1567         }
1568 
1569         /*
1570          * lockres doesn't exist on this node
1571          * if there is an MLE_BLOCK, return NO
1572          * if there is an MLE_MASTER, return MAYBE
1573          * otherwise, add an MLE_BLOCK, return NO
1574          */
1575         spin_lock(&dlm->master_lock);
1576         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1577         if (!found) {
1578                 /* this lockid has never been seen on this node yet */
1579                 // mlog(0, "no mle found\n");
1580                 if (!mle) {
1581                         spin_unlock(&dlm->master_lock);
1582                         spin_unlock(&dlm->spinlock);
1583 
1584                         mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1585                         if (!mle) {
1586                                 response = DLM_MASTER_RESP_ERROR;
1587                                 mlog_errno(-ENOMEM);
1588                                 goto send_response;
1589                         }
1590                         goto way_up_top;
1591                 }
1592 
1593                 // mlog(0, "this is second time thru, already allocated, "
1594                 // "add the block.\n");
1595                 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1596                 set_bit(request->node_idx, mle->maybe_map);
1597                 __dlm_insert_mle(dlm, mle);
1598                 response = DLM_MASTER_RESP_NO;
1599         } else {
1600                 spin_lock(&tmpmle->spinlock);
1601                 if (tmpmle->master == dlm->node_num) {
1602                         mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1603                         BUG();
1604                 }
1605                 if (tmpmle->type == DLM_MLE_BLOCK)
1606                         response = DLM_MASTER_RESP_NO;
1607                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1608                         mlog(0, "migration mle was found (%u->%u)\n",
1609                              tmpmle->master, tmpmle->new_master);
1610                         /* real master can respond on its own */
1611                         response = DLM_MASTER_RESP_NO;
1612                 } else
1613                         response = DLM_MASTER_RESP_MAYBE;
1614                 set_bit(request->node_idx, tmpmle->maybe_map);
1615                 spin_unlock(&tmpmle->spinlock);
1616         }
1617         spin_unlock(&dlm->master_lock);
1618         spin_unlock(&dlm->spinlock);
1619 
1620         if (found) {
1621                 /* keep the mle attached to heartbeat events */
1622                 dlm_put_mle(tmpmle);
1623         }
1624 send_response:
1625         /*
1626          * __dlm_lookup_lockres() grabbed a reference to this lockres.
1627          * The reference is released by dlm_assert_master_worker() under
1628          * the call to dlm_dispatch_assert_master().  If
1629          * dlm_assert_master_worker() isn't called, we drop it here.
1630          */
1631         if (dispatch_assert) {
1632                 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1633                              dlm->node_num, res->lockname.len, res->lockname.name);
1634                 spin_lock(&res->spinlock);
1635                 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1636                                                  DLM_ASSERT_MASTER_MLE_CLEANUP);
1637                 if (ret < 0) {
1638                         mlog(ML_ERROR, "failed to dispatch assert master work\n");
1639                         response = DLM_MASTER_RESP_ERROR;
1640                         spin_unlock(&res->spinlock);
1641                         dlm_lockres_put(res);
1642                 } else {
1643                         dispatched = 1;
1644                         __dlm_lockres_grab_inflight_worker(dlm, res);
1645                         spin_unlock(&res->spinlock);
1646                 }
1647         } else {
1648                 if (res)
1649                         dlm_lockres_put(res);
1650         }
1651 
1652         if (!dispatched)
1653                 dlm_put(dlm);
1654         return response;
1655 }
1656 
1657 /*
1658  * DLM_ASSERT_MASTER_MSG
1659  */
1660 
1661 
1662 /*
1663  * NOTE: this can be used for debugging
1664  * can periodically run all locks owned by this node
1665  * and re-assert across the cluster...
1666  */
1667 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1668                                 struct dlm_lock_resource *res,
1669                                 void *nodemap, u32 flags)
1670 {
1671         struct dlm_assert_master assert;
1672         int to, tmpret;
1673         struct dlm_node_iter iter;
1674         int ret = 0;
1675         int reassert;
1676         const char *lockname = res->lockname.name;
1677         unsigned int namelen = res->lockname.len;
1678 
1679         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1680 
1681         spin_lock(&res->spinlock);
1682         res->state |= DLM_LOCK_RES_SETREF_INPROG;
1683         spin_unlock(&res->spinlock);
1684 
1685 again:
1686         reassert = 0;
1687 
1688         /* note that if this nodemap is empty, it returns 0 */
1689         dlm_node_iter_init(nodemap, &iter);
1690         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1691                 int r = 0;
1692                 struct dlm_master_list_entry *mle = NULL;
1693 
1694                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1695                      namelen, lockname);
1696                 memset(&assert, 0, sizeof(assert));
1697                 assert.node_idx = dlm->node_num;
1698                 assert.namelen = namelen;
1699                 memcpy(assert.name, lockname, namelen);
1700                 assert.flags = cpu_to_be32(flags);
1701 
1702                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1703                                             &assert, sizeof(assert), to, &r);
1704                 if (tmpret < 0) {
1705                         mlog(ML_ERROR, "Error %d when sending message %u (key "
1706                              "0x%x) to node %u\n", tmpret,
1707                              DLM_ASSERT_MASTER_MSG, dlm->key, to);
1708                         if (!dlm_is_host_down(tmpret)) {
1709                                 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1710                                 BUG();
1711                         }
1712                         /* a node died.  finish out the rest of the nodes. */
1713                         mlog(0, "link to %d went down!\n", to);
1714                         /* any nonzero status return will do */
1715                         ret = tmpret;
1716                         r = 0;
1717                 } else if (r < 0) {
1718                         /* ok, something horribly messed.  kill thyself. */
1719                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1720                              "got %d.\n", namelen, lockname, to, r);
1721                         spin_lock(&dlm->spinlock);
1722                         spin_lock(&dlm->master_lock);
1723                         if (dlm_find_mle(dlm, &mle, (char *)lockname,
1724                                          namelen)) {
1725                                 dlm_print_one_mle(mle);
1726                                 __dlm_put_mle(mle);
1727                         }
1728                         spin_unlock(&dlm->master_lock);
1729                         spin_unlock(&dlm->spinlock);
1730                         BUG();
1731                 }
1732 
1733                 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1734                     !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1735                                 mlog(ML_ERROR, "%.*s: very strange, "
1736                                      "master MLE but no lockres on %u\n",
1737                                      namelen, lockname, to);
1738                 }
1739 
1740                 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1741                         mlog(0, "%.*s: node %u create mles on other "
1742                              "nodes and requests a re-assert\n",
1743                              namelen, lockname, to);
1744                         reassert = 1;
1745                 }
1746                 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1747                         mlog(0, "%.*s: node %u has a reference to this "
1748                              "lockres, set the bit in the refmap\n",
1749                              namelen, lockname, to);
1750                         spin_lock(&res->spinlock);
1751                         dlm_lockres_set_refmap_bit(dlm, res, to);
1752                         spin_unlock(&res->spinlock);
1753                 }
1754         }
1755 
1756         if (reassert)
1757                 goto again;
1758 
1759         spin_lock(&res->spinlock);
1760         res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1761         spin_unlock(&res->spinlock);
1762         wake_up(&res->wq);
1763 
1764         return ret;
1765 }
1766 
1767 /*
1768  * locks that can be taken here:
1769  * dlm->spinlock
1770  * res->spinlock
1771  * mle->spinlock
1772  * dlm->master_list
1773  *
1774  * if possible, TRIM THIS DOWN!!!
1775  */
1776 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1777                               void **ret_data)
1778 {
1779         struct dlm_ctxt *dlm = data;
1780         struct dlm_master_list_entry *mle = NULL;
1781         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1782         struct dlm_lock_resource *res = NULL;
1783         char *name;
1784         unsigned int namelen, hash;
1785         u32 flags;
1786         int master_request = 0, have_lockres_ref = 0;
1787         int ret = 0;
1788 
1789         if (!dlm_grab(dlm))
1790                 return 0;
1791 
1792         name = assert->name;
1793         namelen = assert->namelen;
1794         hash = dlm_lockid_hash(name, namelen);
1795         flags = be32_to_cpu(assert->flags);
1796 
1797         if (namelen > DLM_LOCKID_NAME_MAX) {
1798                 mlog(ML_ERROR, "Invalid name length!");
1799                 goto done;
1800         }
1801 
1802         spin_lock(&dlm->spinlock);
1803 
1804         if (flags)
1805                 mlog(0, "assert_master with flags: %u\n", flags);
1806 
1807         /* find the MLE */
1808         spin_lock(&dlm->master_lock);
1809         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1810                 /* not an error, could be master just re-asserting */
1811                 mlog(0, "just got an assert_master from %u, but no "
1812                      "MLE for it! (%.*s)\n", assert->node_idx,
1813                      namelen, name);
1814         } else {
1815                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1816                 if (bit >= O2NM_MAX_NODES) {
1817                         /* not necessarily an error, though less likely.
1818                          * could be master just re-asserting. */
1819                         mlog(0, "no bits set in the maybe_map, but %u "
1820                              "is asserting! (%.*s)\n", assert->node_idx,
1821                              namelen, name);
1822                 } else if (bit != assert->node_idx) {
1823                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1824                                 mlog(0, "master %u was found, %u should "
1825                                      "back off\n", assert->node_idx, bit);
1826                         } else {
1827                                 /* with the fix for bug 569, a higher node
1828                                  * number winning the mastery will respond
1829                                  * YES to mastery requests, but this node
1830                                  * had no way of knowing.  let it pass. */
1831                                 mlog(0, "%u is the lowest node, "
1832                                      "%u is asserting. (%.*s)  %u must "
1833                                      "have begun after %u won.\n", bit,
1834                                      assert->node_idx, namelen, name, bit,
1835                                      assert->node_idx);
1836                         }
1837                 }
1838                 if (mle->type == DLM_MLE_MIGRATION) {
1839                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1840                                 mlog(0, "%s:%.*s: got cleanup assert"
1841                                      " from %u for migration\n",
1842                                      dlm->name, namelen, name,
1843                                      assert->node_idx);
1844                         } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1845                                 mlog(0, "%s:%.*s: got unrelated assert"
1846                                      " from %u for migration, ignoring\n",
1847                                      dlm->name, namelen, name,
1848                                      assert->node_idx);
1849                                 __dlm_put_mle(mle);
1850                                 spin_unlock(&dlm->master_lock);
1851                                 spin_unlock(&dlm->spinlock);
1852                                 goto done;
1853                         }
1854                 }
1855         }
1856         spin_unlock(&dlm->master_lock);
1857 
1858         /* ok everything checks out with the MLE
1859          * now check to see if there is a lockres */
1860         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1861         if (res) {
1862                 spin_lock(&res->spinlock);
1863                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1864                         mlog(ML_ERROR, "%u asserting but %.*s is "
1865                              "RECOVERING!\n", assert->node_idx, namelen, name);
1866                         goto kill;
1867                 }
1868                 if (!mle) {
1869                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1870                             res->owner != assert->node_idx) {
1871                                 mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1872                                      "but current owner is %u! (%.*s)\n",
1873                                      assert->node_idx, res->owner, namelen,
1874                                      name);
1875                                 __dlm_print_one_lock_resource(res);
1876                                 BUG();
1877                         }
1878                 } else if (mle->type != DLM_MLE_MIGRATION) {
1879                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1880                                 /* owner is just re-asserting */
1881                                 if (res->owner == assert->node_idx) {
1882                                         mlog(0, "owner %u re-asserting on "
1883                                              "lock %.*s\n", assert->node_idx,
1884                                              namelen, name);
1885                                         goto ok;
1886                                 }
1887                                 mlog(ML_ERROR, "got assert_master from "
1888                                      "node %u, but %u is the owner! "
1889                                      "(%.*s)\n", assert->node_idx,
1890                                      res->owner, namelen, name);
1891                                 goto kill;
1892                         }
1893                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1894                                 mlog(ML_ERROR, "got assert from %u, but lock "
1895                                      "with no owner should be "
1896                                      "in-progress! (%.*s)\n",
1897                                      assert->node_idx,
1898                                      namelen, name);
1899                                 goto kill;
1900                         }
1901                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1902                         /* should only be getting an assert from new master */
1903                         if (assert->node_idx != mle->new_master) {
1904                                 mlog(ML_ERROR, "got assert from %u, but "
1905                                      "new master is %u, and old master "
1906                                      "was %u (%.*s)\n",
1907                                      assert->node_idx, mle->new_master,
1908                                      mle->master, namelen, name);
1909                                 goto kill;
1910                         }
1911 
1912                 }
1913 ok:
1914                 spin_unlock(&res->spinlock);
1915         }
1916 
1917         // mlog(0, "woo!  got an assert_master from node %u!\n",
1918         //           assert->node_idx);
1919         if (mle) {
1920                 int extra_ref = 0;
1921                 int nn = -1;
1922                 int rr, err = 0;
1923 
1924                 spin_lock(&mle->spinlock);
1925                 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1926                         extra_ref = 1;
1927                 else {
1928                         /* MASTER mle: if any bits set in the response map
1929                          * then the calling node needs to re-assert to clear
1930                          * up nodes that this node contacted */
1931                         while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1932                                                     nn+1)) < O2NM_MAX_NODES) {
1933                                 if (nn != dlm->node_num && nn != assert->node_idx) {
1934                                         master_request = 1;
1935                                         break;
1936                                 }
1937                         }
1938                 }
1939                 mle->master = assert->node_idx;
1940                 atomic_set(&mle->woken, 1);
1941                 wake_up(&mle->wq);
1942                 spin_unlock(&mle->spinlock);
1943 
1944                 if (res) {
1945                         int wake = 0;
1946                         spin_lock(&res->spinlock);
1947                         if (mle->type == DLM_MLE_MIGRATION) {
1948                                 mlog(0, "finishing off migration of lockres %.*s, "
1949                                         "from %u to %u\n",
1950                                         res->lockname.len, res->lockname.name,
1951                                         dlm->node_num, mle->new_master);
1952                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1953                                 wake = 1;
1954                                 dlm_change_lockres_owner(dlm, res, mle->new_master);
1955                                 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1956                         } else {
1957                                 dlm_change_lockres_owner(dlm, res, mle->master);
1958                         }
1959                         spin_unlock(&res->spinlock);
1960                         have_lockres_ref = 1;
1961                         if (wake)
1962                                 wake_up(&res->wq);
1963                 }
1964 
1965                 /* master is known, detach if not already detached.
1966                  * ensures that only one assert_master call will happen
1967                  * on this mle. */
1968                 spin_lock(&dlm->master_lock);
1969 
1970                 rr = kref_read(&mle->mle_refs);
1971                 if (mle->inuse > 0) {
1972                         if (extra_ref && rr < 3)
1973                                 err = 1;
1974                         else if (!extra_ref && rr < 2)
1975                                 err = 1;
1976                 } else {
1977                         if (extra_ref && rr < 2)
1978                                 err = 1;
1979                         else if (!extra_ref && rr < 1)
1980                                 err = 1;
1981                 }
1982                 if (err) {
1983                         mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1984                              "that will mess up this node, refs=%d, extra=%d, "
1985                              "inuse=%d\n", dlm->name, namelen, name,
1986                              assert->node_idx, rr, extra_ref, mle->inuse);
1987                         dlm_print_one_mle(mle);
1988                 }
1989                 __dlm_unlink_mle(dlm, mle);
1990                 __dlm_mle_detach_hb_events(dlm, mle);
1991                 __dlm_put_mle(mle);
1992                 if (extra_ref) {
1993                         /* the assert master message now balances the extra
1994                          * ref given by the master / migration request message.
1995                          * if this is the last put, it will be removed
1996                          * from the list. */
1997                         __dlm_put_mle(mle);
1998                 }
1999                 spin_unlock(&dlm->master_lock);
2000         } else if (res) {
2001                 if (res->owner != assert->node_idx) {
2002                         mlog(0, "assert_master from %u, but current "
2003                              "owner is %u (%.*s), no mle\n", assert->node_idx,
2004                              res->owner, namelen, name);
2005                 }
2006         }
2007         spin_unlock(&dlm->spinlock);
2008 
2009 done:
2010         ret = 0;
2011         if (res) {
2012                 spin_lock(&res->spinlock);
2013                 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2014                 spin_unlock(&res->spinlock);
2015                 *ret_data = (void *)res;
2016         }
2017         dlm_put(dlm);
2018         if (master_request) {
2019                 mlog(0, "need to tell master to reassert\n");
2020                 /* positive. negative would shoot down the node. */
2021                 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2022                 if (!have_lockres_ref) {
2023                         mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2024                              "mle present here for %s:%.*s, but no lockres!\n",
2025                              assert->node_idx, dlm->name, namelen, name);
2026                 }
2027         }
2028         if (have_lockres_ref) {
2029                 /* let the master know we have a reference to the lockres */
2030                 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2031                 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2032                      dlm->name, namelen, name, assert->node_idx);
2033         }
2034         return ret;
2035 
2036 kill:
2037         /* kill the caller! */
2038         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2039              "and killing the other node now!  This node is OK and can continue.\n");
2040         __dlm_print_one_lock_resource(res);
2041         spin_unlock(&res->spinlock);
2042         spin_lock(&dlm->master_lock);
2043         if (mle)
2044                 __dlm_put_mle(mle);
2045         spin_unlock(&dlm->master_lock);
2046         spin_unlock(&dlm->spinlock);
2047         *ret_data = (void *)res;
2048         dlm_put(dlm);
2049         return -EINVAL;
2050 }
2051 
2052 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2053 {
2054         struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2055 
2056         if (ret_data) {
2057                 spin_lock(&res->spinlock);
2058                 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2059                 spin_unlock(&res->spinlock);
2060                 wake_up(&res->wq);
2061                 dlm_lockres_put(res);
2062         }
2063         return;
2064 }
2065 
2066 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2067                                struct dlm_lock_resource *res,
2068                                int ignore_higher, u8 request_from, u32 flags)
2069 {
2070         struct dlm_work_item *item;
2071         item = kzalloc(sizeof(*item), GFP_ATOMIC);
2072         if (!item)
2073                 return -ENOMEM;
2074 
2075 
2076         /* queue up work for dlm_assert_master_worker */
2077         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2078         item->u.am.lockres = res; /* already have a ref */
2079         /* can optionally ignore node numbers higher than this node */
2080         item->u.am.ignore_higher = ignore_higher;
2081         item->u.am.request_from = request_from;
2082         item->u.am.flags = flags;
2083 
2084         if (ignore_higher)
2085                 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2086                      res->lockname.name);
2087 
2088         spin_lock(&dlm->work_lock);
2089         list_add_tail(&item->list, &dlm->work_list);
2090         spin_unlock(&dlm->work_lock);
2091 
2092         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2093         return 0;
2094 }
2095 
2096 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2097 {
2098         struct dlm_ctxt *dlm = data;
2099         int ret = 0;
2100         struct dlm_lock_resource *res;
2101         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2102         int ignore_higher;
2103         int bit;
2104         u8 request_from;
2105         u32 flags;
2106 
2107         dlm = item->dlm;
2108         res = item->u.am.lockres;
2109         ignore_higher = item->u.am.ignore_higher;
2110         request_from = item->u.am.request_from;
2111         flags = item->u.am.flags;
2112 
2113         spin_lock(&dlm->spinlock);
2114         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2115         spin_unlock(&dlm->spinlock);
2116 
2117         clear_bit(dlm->node_num, nodemap);
2118         if (ignore_higher) {
2119                 /* if is this just to clear up mles for nodes below
2120                  * this node, do not send the message to the original
2121                  * caller or any node number higher than this */
2122                 clear_bit(request_from, nodemap);
2123                 bit = dlm->node_num;
2124                 while (1) {
2125                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2126                                             bit+1);
2127                         if (bit >= O2NM_MAX_NODES)
2128                                 break;
2129                         clear_bit(bit, nodemap);
2130                 }
2131         }
2132 
2133         /*
2134          * If we're migrating this lock to someone else, we are no
2135          * longer allowed to assert out own mastery.  OTOH, we need to
2136          * prevent migration from starting while we're still asserting
2137          * our dominance.  The reserved ast delays migration.
2138          */
2139         spin_lock(&res->spinlock);
2140         if (res->state & DLM_LOCK_RES_MIGRATING) {
2141                 mlog(0, "Someone asked us to assert mastery, but we're "
2142                      "in the middle of migration.  Skipping assert, "
2143                      "the new master will handle that.\n");
2144                 spin_unlock(&res->spinlock);
2145                 goto put;
2146         } else
2147                 __dlm_lockres_reserve_ast(res);
2148         spin_unlock(&res->spinlock);
2149 
2150         /* this call now finishes out the nodemap
2151          * even if one or more nodes die */
2152         mlog(0, "worker about to master %.*s here, this=%u\n",
2153                      res->lockname.len, res->lockname.name, dlm->node_num);
2154         ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2155         if (ret < 0) {
2156                 /* no need to restart, we are done */
2157                 if (!dlm_is_host_down(ret))
2158                         mlog_errno(ret);
2159         }
2160 
2161         /* Ok, we've asserted ourselves.  Let's let migration start. */
2162         dlm_lockres_release_ast(dlm, res);
2163 
2164 put:
2165         dlm_lockres_drop_inflight_worker(dlm, res);
2166 
2167         dlm_lockres_put(res);
2168 
2169         mlog(0, "finished with dlm_assert_master_worker\n");
2170 }
2171 
2172 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2173  * We cannot wait for node recovery to complete to begin mastering this
2174  * lockres because this lockres is used to kick off recovery! ;-)
2175  * So, do a pre-check on all living nodes to see if any of those nodes
2176  * think that $RECOVERY is currently mastered by a dead node.  If so,
2177  * we wait a short time to allow that node to get notified by its own
2178  * heartbeat stack, then check again.  All $RECOVERY lock resources
2179  * mastered by dead nodes are purged when the hearbeat callback is
2180  * fired, so we can know for sure that it is safe to continue once
2181  * the node returns a live node or no node.  */
2182 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2183                                        struct dlm_lock_resource *res)
2184 {
2185         struct dlm_node_iter iter;
2186         int nodenum;
2187         int ret = 0;
2188         u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2189 
2190         spin_lock(&dlm->spinlock);
2191         dlm_node_iter_init(dlm->domain_map, &iter);
2192         spin_unlock(&dlm->spinlock);
2193 
2194         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2195                 /* do not send to self */
2196                 if (nodenum == dlm->node_num)
2197                         continue;
2198                 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2199                 if (ret < 0) {
2200                         mlog_errno(ret);
2201                         if (!dlm_is_host_down(ret))
2202                                 BUG();
2203                         /* host is down, so answer for that node would be
2204                          * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2205                         ret = 0;
2206                 }
2207 
2208                 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2209                         /* check to see if this master is in the recovery map */
2210                         spin_lock(&dlm->spinlock);
2211                         if (test_bit(master, dlm->recovery_map)) {
2212                                 mlog(ML_NOTICE, "%s: node %u has not seen "
2213                                      "node %u go down yet, and thinks the "
2214                                      "dead node is mastering the recovery "
2215                                      "lock.  must wait.\n", dlm->name,
2216                                      nodenum, master);
2217                                 ret = -EAGAIN;
2218                         }
2219                         spin_unlock(&dlm->spinlock);
2220                         mlog(0, "%s: reco lock master is %u\n", dlm->name,
2221                              master);
2222                         break;
2223                 }
2224         }
2225         return ret;
2226 }
2227 
2228 /*
2229  * DLM_DEREF_LOCKRES_MSG
2230  */
2231 
2232 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2233 {
2234         struct dlm_deref_lockres deref;
2235         int ret = 0, r;
2236         const char *lockname;
2237         unsigned int namelen;
2238 
2239         lockname = res->lockname.name;
2240         namelen = res->lockname.len;
2241         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2242 
2243         memset(&deref, 0, sizeof(deref));
2244         deref.node_idx = dlm->node_num;
2245         deref.namelen = namelen;
2246         memcpy(deref.name, lockname, namelen);
2247 
2248         ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2249                                  &deref, sizeof(deref), res->owner, &r);
2250         if (ret < 0)
2251                 mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2252                      dlm->name, namelen, lockname, ret, res->owner);
2253         else if (r < 0) {
2254                 /* BAD.  other node says I did not have a ref. */
2255                 mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2256                      dlm->name, namelen, lockname, res->owner, r);
2257                 dlm_print_one_lock_resource(res);
2258                 if (r == -ENOMEM)
2259                         BUG();
2260         } else
2261                 ret = r;
2262 
2263         return ret;
2264 }
2265 
2266 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2267                               void **ret_data)
2268 {
2269         struct dlm_ctxt *dlm = data;
2270         struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2271         struct dlm_lock_resource *res = NULL;
2272         char *name;
2273         unsigned int namelen;
2274         int ret = -EINVAL;
2275         u8 node;
2276         unsigned int hash;
2277         struct dlm_work_item *item;
2278         int cleared = 0;
2279         int dispatch = 0;
2280 
2281         if (!dlm_grab(dlm))
2282                 return 0;
2283 
2284         name = deref->name;
2285         namelen = deref->namelen;
2286         node = deref->node_idx;
2287 
2288         if (namelen > DLM_LOCKID_NAME_MAX) {
2289                 mlog(ML_ERROR, "Invalid name length!");
2290                 goto done;
2291         }
2292         if (deref->node_idx >= O2NM_MAX_NODES) {
2293                 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2294                 goto done;
2295         }
2296 
2297         hash = dlm_lockid_hash(name, namelen);
2298 
2299         spin_lock(&dlm->spinlock);
2300         res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2301         if (!res) {
2302                 spin_unlock(&dlm->spinlock);
2303                 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2304                      dlm->name, namelen, name);
2305                 goto done;
2306         }
2307         spin_unlock(&dlm->spinlock);
2308 
2309         spin_lock(&res->spinlock);
2310         if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2311                 dispatch = 1;
2312         else {
2313                 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2314                 if (test_bit(node, res->refmap)) {
2315                         dlm_lockres_clear_refmap_bit(dlm, res, node);
2316                         cleared = 1;
2317                 }
2318         }
2319         spin_unlock(&res->spinlock);
2320 
2321         if (!dispatch) {
2322                 if (cleared)
2323                         dlm_lockres_calc_usage(dlm, res);
2324                 else {
2325                         mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2326                         "but it is already dropped!\n", dlm->name,
2327                         res->lockname.len, res->lockname.name, node);
2328                         dlm_print_one_lock_resource(res);
2329                 }
2330                 ret = DLM_DEREF_RESPONSE_DONE;
2331                 goto done;
2332         }
2333 
2334         item = kzalloc(sizeof(*item), GFP_NOFS);
2335         if (!item) {
2336                 ret = -ENOMEM;
2337                 mlog_errno(ret);
2338                 goto done;
2339         }
2340 
2341         dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2342         item->u.dl.deref_res = res;
2343         item->u.dl.deref_node = node;
2344 
2345         spin_lock(&dlm->work_lock);
2346         list_add_tail(&item->list, &dlm->work_list);
2347         spin_unlock(&dlm->work_lock);
2348 
2349         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2350         return DLM_DEREF_RESPONSE_INPROG;
2351 
2352 done:
2353         if (res)
2354                 dlm_lockres_put(res);
2355         dlm_put(dlm);
2356 
2357         return ret;
2358 }
2359 
2360 int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
2361                               void **ret_data)
2362 {
2363         struct dlm_ctxt *dlm = data;
2364         struct dlm_deref_lockres_done *deref
2365                         = (struct dlm_deref_lockres_done *)msg->buf;
2366         struct dlm_lock_resource *res = NULL;
2367         char *name;
2368         unsigned int namelen;
2369         int ret = -EINVAL;
2370         u8 node;
2371         unsigned int hash;
2372 
2373         if (!dlm_grab(dlm))
2374                 return 0;
2375 
2376         name = deref->name;
2377         namelen = deref->namelen;
2378         node = deref->node_idx;
2379 
2380         if (namelen > DLM_LOCKID_NAME_MAX) {
2381                 mlog(ML_ERROR, "Invalid name length!");
2382                 goto done;
2383         }
2384         if (deref->node_idx >= O2NM_MAX_NODES) {
2385                 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2386                 goto done;
2387         }
2388 
2389         hash = dlm_lockid_hash(name, namelen);
2390 
2391         spin_lock(&dlm->spinlock);
2392         res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2393         if (!res) {
2394                 spin_unlock(&dlm->spinlock);
2395                 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2396                      dlm->name, namelen, name);
2397                 goto done;
2398         }
2399 
2400         spin_lock(&res->spinlock);
2401         if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
2402                 spin_unlock(&res->spinlock);
2403                 spin_unlock(&dlm->spinlock);
2404                 mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
2405                         "but it is already derefed!\n", dlm->name,
2406                         res->lockname.len, res->lockname.name, node);
2407                 ret = 0;
2408                 goto done;
2409         }
2410 
2411         __dlm_do_purge_lockres(dlm, res);
2412         spin_unlock(&res->spinlock);
2413         wake_up(&res->wq);
2414 
2415         spin_unlock(&dlm->spinlock);
2416 
2417         ret = 0;
2418 done:
2419         if (res)
2420                 dlm_lockres_put(res);
2421         dlm_put(dlm);
2422         return ret;
2423 }
2424 
2425 static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
2426                 struct dlm_lock_resource *res, u8 node)
2427 {
2428         struct dlm_deref_lockres_done deref;
2429         int ret = 0, r;
2430         const char *lockname;
2431         unsigned int namelen;
2432 
2433         lockname = res->lockname.name;
2434         namelen = res->lockname.len;
2435         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2436 
2437         memset(&deref, 0, sizeof(deref));
2438         deref.node_idx = dlm->node_num;
2439         deref.namelen = namelen;
2440         memcpy(deref.name, lockname, namelen);
2441 
2442         ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
2443                                  &deref, sizeof(deref), node, &r);
2444         if (ret < 0) {
2445                 mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
2446                                 " to node %u\n", dlm->name, namelen,
2447                                 lockname, ret, node);
2448         } else if (r < 0) {
2449                 /* ignore the error */
2450                 mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2451                      dlm->name, namelen, lockname, node, r);
2452                 dlm_print_one_lock_resource(res);
2453         }
2454 }
2455 
2456 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2457 {
2458         struct dlm_ctxt *dlm;
2459         struct dlm_lock_resource *res;
2460         u8 node;
2461         u8 cleared = 0;
2462 
2463         dlm = item->dlm;
2464         res = item->u.dl.deref_res;
2465         node = item->u.dl.deref_node;
2466 
2467         spin_lock(&res->spinlock);
2468         BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2469         __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2470         if (test_bit(node, res->refmap)) {
2471                 dlm_lockres_clear_refmap_bit(dlm, res, node);
2472                 cleared = 1;
2473         }
2474         spin_unlock(&res->spinlock);
2475 
2476         dlm_drop_lockres_ref_done(dlm, res, node);
2477 
2478         if (cleared) {
2479                 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2480                      dlm->name, res->lockname.len, res->lockname.name, node);
2481                 dlm_lockres_calc_usage(dlm, res);
2482         } else {
2483                 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2484                      "but it is already dropped!\n", dlm->name,
2485                      res->lockname.len, res->lockname.name, node);
2486                 dlm_print_one_lock_resource(res);
2487         }
2488 
2489         dlm_lockres_put(res);
2490 }
2491 
2492 /*
2493  * A migratable resource is one that is :
2494  * 1. locally mastered, and,
2495  * 2. zero local locks, and,
2496  * 3. one or more non-local locks, or, one or more references
2497  * Returns 1 if yes, 0 if not.
2498  */
2499 static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
2500                                       struct dlm_lock_resource *res)
2501 {
2502         enum dlm_lockres_list idx;
2503         int nonlocal = 0, node_ref;
2504         struct list_head *queue;
2505         struct dlm_lock *lock;
2506         u64 cookie;
2507 
2508         assert_spin_locked(&res->spinlock);
2509 
2510         /* delay migration when the lockres is in MIGRATING state */
2511         if (res->state & DLM_LOCK_RES_MIGRATING)
2512                 return 0;
2513 
2514         /* delay migration when the lockres is in RECOCERING state */
2515         if (res->state & (DLM_LOCK_RES_RECOVERING|
2516                         DLM_LOCK_RES_RECOVERY_WAITING))
2517                 return 0;
2518 
2519         if (res->owner != dlm->node_num)
2520                 return 0;
2521 
2522         for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2523                 queue = dlm_list_idx_to_ptr(res, idx);
2524                 list_for_each_entry(lock, queue, list) {
2525                         if (lock->ml.node != dlm->node_num) {
2526                                 nonlocal++;
2527                                 continue;
2528                         }
2529                         cookie = be64_to_cpu(lock->ml.cookie);
2530                         mlog(0, "%s: Not migratable res %.*s, lock %u:%llu on "
2531                              "%s list\n", dlm->name, res->lockname.len,
2532                              res->lockname.name,
2533                              dlm_get_lock_cookie_node(cookie),
2534                              dlm_get_lock_cookie_seq(cookie),
2535                              dlm_list_in_text(idx));
2536                         return 0;
2537                 }
2538         }
2539 
2540         if (!nonlocal) {
2541                 node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2542                 if (node_ref >= O2NM_MAX_NODES)
2543                         return 0;
2544         }
2545 
2546         mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
2547              res->lockname.name);
2548 
2549         return 1;
2550 }
2551 
2552 /*
2553  * DLM_MIGRATE_LOCKRES
2554  */
2555 
2556 
2557 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2558                                struct dlm_lock_resource *res, u8 target)
2559 {
2560         struct dlm_master_list_entry *mle = NULL;
2561         struct dlm_master_list_entry *oldmle = NULL;
2562         struct dlm_migratable_lockres *mres = NULL;
2563         int ret = 0;
2564         const char *name;
2565         unsigned int namelen;
2566         int mle_added = 0;
2567         int wake = 0;
2568 
2569         if (!dlm_grab(dlm))
2570                 return -EINVAL;
2571 
2572         BUG_ON(target == O2NM_MAX_NODES);
2573 
2574         name = res->lockname.name;
2575         namelen = res->lockname.len;
2576 
2577         mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2578              target);
2579 
2580         /* preallocate up front. if this fails, abort */
2581         ret = -ENOMEM;
2582         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2583         if (!mres) {
2584                 mlog_errno(ret);
2585                 goto leave;
2586         }
2587 
2588         mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2589         if (!mle) {
2590                 mlog_errno(ret);
2591                 goto leave;
2592         }
2593         ret = 0;
2594 
2595         /*
2596          * clear any existing master requests and
2597          * add the migration mle to the list
2598          */
2599         spin_lock(&dlm->spinlock);
2600         spin_lock(&dlm->master_lock);
2601         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2602                                     namelen, target, dlm->node_num);
2603         /* get an extra reference on the mle.
2604          * otherwise the assert_master from the new
2605          * master will destroy this.
2606          */
2607         if (ret != -EEXIST)
2608                 dlm_get_mle_inuse(mle);
2609 
2610         spin_unlock(&dlm->master_lock);
2611         spin_unlock(&dlm->spinlock);
2612 
2613         if (ret == -EEXIST) {
2614                 mlog(0, "another process is already migrating it\n");
2615                 goto fail;
2616         }
2617         mle_added = 1;
2618 
2619         /*
2620          * set the MIGRATING flag and flush asts
2621          * if we fail after this we need to re-dirty the lockres
2622          */
2623         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2624                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2625                      "the target went down.\n", res->lockname.len,
2626                      res->lockname.name, target);
2627                 spin_lock(&res->spinlock);
2628                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2629                 wake = 1;
2630                 spin_unlock(&res->spinlock);
2631                 ret = -EINVAL;
2632         }
2633 
2634 fail:
2635         if (ret != -EEXIST && oldmle) {
2636                 /* master is known, detach if not already detached */
2637                 dlm_mle_detach_hb_events(dlm, oldmle);
2638                 dlm_put_mle(oldmle);
2639         }
2640 
2641         if (ret < 0) {
2642                 if (mle_added) {
2643                         dlm_mle_detach_hb_events(dlm, mle);
2644                         dlm_put_mle(mle);
2645                         dlm_put_mle_inuse(mle);
2646                 } else if (mle) {
2647                         kmem_cache_free(dlm_mle_cache, mle);
2648                         mle = NULL;
2649                 }
2650                 goto leave;
2651         }
2652 
2653         /*
2654          * at this point, we have a migration target, an mle
2655          * in the master list, and the MIGRATING flag set on
2656          * the lockres
2657          */
2658 
2659         /* now that remote nodes are spinning on the MIGRATING flag,
2660          * ensure that all assert_master work is flushed. */
2661         flush_workqueue(dlm->dlm_worker);
2662 
2663         /* notify new node and send all lock state */
2664         /* call send_one_lockres with migration flag.
2665          * this serves as notice to the target node that a
2666          * migration is starting. */
2667         ret = dlm_send_one_lockres(dlm, res, mres, target,
2668                                    DLM_MRES_MIGRATION);
2669 
2670         if (ret < 0) {
2671                 mlog(0, "migration to node %u failed with %d\n",
2672                      target, ret);
2673                 /* migration failed, detach and clean up mle */
2674                 dlm_mle_detach_hb_events(dlm, mle);
2675                 dlm_put_mle(mle);
2676                 dlm_put_mle_inuse(mle);
2677                 spin_lock(&res->spinlock);
2678                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2679                 wake = 1;
2680                 spin_unlock(&res->spinlock);
2681                 if (dlm_is_host_down(ret))
2682                         dlm_wait_for_node_death(dlm, target,
2683                                                 DLM_NODE_DEATH_WAIT_MAX);
2684                 goto leave;
2685         }
2686 
2687         /* at this point, the target sends a message to all nodes,
2688          * (using dlm_do_migrate_request).  this node is skipped since
2689          * we had to put an mle in the list to begin the process.  this
2690          * node now waits for target to do an assert master.  this node
2691          * will be the last one notified, ensuring that the migration
2692          * is complete everywhere.  if the target dies while this is
2693          * going on, some nodes could potentially see the target as the
2694          * master, so it is important that my recovery finds the migration
2695          * mle and sets the master to UNKNOWN. */
2696 
2697 
2698         /* wait for new node to assert master */
2699         while (1) {
2700                 ret = wait_event_interruptible_timeout(mle->wq,
2701                                         (atomic_read(&mle->woken) == 1),
2702                                         msecs_to_jiffies(5000));
2703 
2704                 if (ret >= 0) {
2705                         if (atomic_read(&mle->woken) == 1 ||
2706                             res->owner == target)
2707                                 break;
2708 
2709                         mlog(0, "%s:%.*s: timed out during migration\n",
2710                              dlm->name, res->lockname.len, res->lockname.name);
2711                         /* avoid hang during shutdown when migrating lockres
2712                          * to a node which also goes down */
2713                         if (dlm_is_node_dead(dlm, target)) {
2714                                 mlog(0, "%s:%.*s: expected migration "
2715                                      "target %u is no longer up, restarting\n",
2716                                      dlm->name, res->lockname.len,
2717                                      res->lockname.name, target);
2718                                 ret = -EINVAL;
2719                                 /* migration failed, detach and clean up mle */
2720                                 dlm_mle_detach_hb_events(dlm, mle);
2721                                 dlm_put_mle(mle);
2722                                 dlm_put_mle_inuse(mle);
2723                                 spin_lock(&res->spinlock);
2724                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2725                                 wake = 1;
2726                                 spin_unlock(&res->spinlock);
2727                                 goto leave;
2728                         }
2729                 } else
2730                         mlog(0, "%s:%.*s: caught signal during migration\n",
2731                              dlm->name, res->lockname.len, res->lockname.name);
2732         }
2733 
2734         /* all done, set the owner, clear the flag */
2735         spin_lock(&res->spinlock);
2736         dlm_set_lockres_owner(dlm, res, target);
2737         res->state &= ~DLM_LOCK_RES_MIGRATING;
2738         dlm_remove_nonlocal_locks(dlm, res);
2739         spin_unlock(&res->spinlock);
2740         wake_up(&res->wq);
2741 
2742         /* master is known, detach if not already detached */
2743         dlm_mle_detach_hb_events(dlm, mle);
2744         dlm_put_mle_inuse(mle);
2745         ret = 0;
2746 
2747         dlm_lockres_calc_usage(dlm, res);
2748 
2749 leave:
2750         /* re-dirty the lockres if we failed */
2751         if (ret < 0)
2752                 dlm_kick_thread(dlm, res);
2753 
2754         /* wake up waiters if the MIGRATING flag got set
2755          * but migration failed */
2756         if (wake)
2757                 wake_up(&res->wq);
2758 
2759         if (mres)
2760                 free_page((unsigned long)mres);
2761 
2762         dlm_put(dlm);
2763 
2764         mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2765              name, target, ret);
2766         return ret;
2767 }
2768 
2769 #define DLM_MIGRATION_RETRY_MS  100
2770 
2771 /*
2772  * Should be called only after beginning the domain leave process.
2773  * There should not be any remaining locks on nonlocal lock resources,
2774  * and there should be no local locks left on locally mastered resources.
2775  *
2776  * Called with the dlm spinlock held, may drop it to do migration, but
2777  * will re-acquire before exit.
2778  *
2779  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2780  */
2781 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2782 {
2783         int ret;
2784         int lock_dropped = 0;
2785         u8 target = O2NM_MAX_NODES;
2786 
2787         assert_spin_locked(&dlm->spinlock);
2788 
2789         spin_lock(&res->spinlock);
2790         if (dlm_is_lockres_migratable(dlm, res))
2791                 target = dlm_pick_migration_target(dlm, res);
2792         spin_unlock(&res->spinlock);
2793 
2794         if (target == O2NM_MAX_NODES)
2795                 goto leave;
2796 
2797         /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2798         spin_unlock(&dlm->spinlock);
2799         lock_dropped = 1;
2800         ret = dlm_migrate_lockres(dlm, res, target);
2801         if (ret)
2802                 mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2803                      dlm->name, res->lockname.len, res->lockname.name,
2804                      target, ret);
2805         spin_lock(&dlm->spinlock);
2806 leave:
2807         return lock_dropped;
2808 }
2809 
2810 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2811 {
2812         int ret;
2813         spin_lock(&dlm->ast_lock);
2814         spin_lock(&lock->spinlock);
2815         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2816         spin_unlock(&lock->spinlock);
2817         spin_unlock(&dlm->ast_lock);
2818         return ret;
2819 }
2820 
2821 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2822                                      struct dlm_lock_resource *res,
2823                                      u8 mig_target)
2824 {
2825         int can_proceed;
2826         spin_lock(&res->spinlock);
2827         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2828         spin_unlock(&res->spinlock);
2829 
2830         /* target has died, so make the caller break out of the
2831          * wait_event, but caller must recheck the domain_map */
2832         spin_lock(&dlm->spinlock);
2833         if (!test_bit(mig_target, dlm->domain_map))
2834                 can_proceed = 1;
2835         spin_unlock(&dlm->spinlock);
2836         return can_proceed;
2837 }
2838 
2839 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2840                                 struct dlm_lock_resource *res)
2841 {
2842         int ret;
2843         spin_lock(&res->spinlock);
2844         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2845         spin_unlock(&res->spinlock);
2846         return ret;
2847 }
2848 
2849 
2850 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2851                                        struct dlm_lock_resource *res,
2852                                        u8 target)
2853 {
2854         int ret = 0;
2855 
2856         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2857                res->lockname.len, res->lockname.name, dlm->node_num,
2858                target);
2859         /* need to set MIGRATING flag on lockres.  this is done by
2860          * ensuring that all asts have been flushed for this lockres. */
2861         spin_lock(&res->spinlock);
2862         BUG_ON(res->migration_pending);
2863         res->migration_pending = 1;
2864         /* strategy is to reserve an extra ast then release
2865          * it below, letting the release do all of the work */
2866         __dlm_lockres_reserve_ast(res);
2867         spin_unlock(&res->spinlock);
2868 
2869         /* now flush all the pending asts */
2870         dlm_kick_thread(dlm, res);
2871         /* before waiting on DIRTY, block processes which may
2872          * try to dirty the lockres before MIGRATING is set */
2873         spin_lock(&res->spinlock);
2874         BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2875         res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2876         spin_unlock(&res->spinlock);
2877         /* now wait on any pending asts and the DIRTY state */
2878         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2879         dlm_lockres_release_ast(dlm, res);
2880 
2881         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2882                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2883         /* if the extra ref we just put was the final one, this
2884          * will pass thru immediately.  otherwise, we need to wait
2885          * for the last ast to finish. */
2886 again:
2887         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2888                    dlm_migration_can_proceed(dlm, res, target),
2889                    msecs_to_jiffies(1000));
2890         if (ret < 0) {
2891                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2892                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2893                        test_bit(target, dlm->domain_map) ? "no":"yes");
2894         } else {
2895                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2896                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2897                        test_bit(target, dlm->domain_map) ? "no":"yes");
2898         }
2899         if (!dlm_migration_can_proceed(dlm, res, target)) {
2900                 mlog(0, "trying again...\n");
2901                 goto again;
2902         }
2903 
2904         ret = 0;
2905         /* did the target go down or die? */
2906         spin_lock(&dlm->spinlock);
2907         if (!test_bit(target, dlm->domain_map)) {
2908                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2909                      target);
2910                 ret = -EHOSTDOWN;
2911         }
2912         spin_unlock(&dlm->spinlock);
2913 
2914         /*
2915          * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2916          * another try; otherwise, we are sure the MIGRATING state is there,
2917          * drop the unneeded state which blocked threads trying to DIRTY
2918          */
2919         spin_lock(&res->spinlock);
2920         BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2921         res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2922         if (!ret)
2923                 BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2924         else
2925                 res->migration_pending = 0;
2926         spin_unlock(&res->spinlock);
2927 
2928         /*
2929          * at this point:
2930          *
2931          *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2932          *   o there are no pending asts on this lockres
2933          *   o all processes trying to reserve an ast on this
2934          *     lockres must wait for the MIGRATING flag to clear
2935          */
2936         return ret;
2937 }
2938 
2939 /* last step in the migration process.
2940  * original master calls this to free all of the dlm_lock
2941  * structures that used to be for other nodes. */
2942 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2943                                       struct dlm_lock_resource *res)
2944 {
2945         struct list_head *queue = &res->granted;
2946         int i, bit;
2947         struct dlm_lock *lock, *next;
2948 
2949         assert_spin_locked(&res->spinlock);
2950 
2951         BUG_ON(res->owner == dlm->node_num);
2952 
2953         for (i=0; i<3; i++) {
2954                 list_for_each_entry_safe(lock, next, queue, list) {
2955                         if (lock->ml.node != dlm->node_num) {
2956                                 mlog(0, "putting lock for node %u\n",
2957                                      lock->ml.node);
2958                                 /* be extra careful */
2959                                 BUG_ON(!list_empty(&lock->ast_list));
2960                                 BUG_ON(!list_empty(&lock->bast_list));
2961                                 BUG_ON(lock->ast_pending);
2962                                 BUG_ON(lock->bast_pending);
2963                                 dlm_lockres_clear_refmap_bit(dlm, res,
2964                                                              lock->ml.node);
2965                                 list_del_init(&lock->list);
2966                                 dlm_lock_put(lock);
2967                                 /* In a normal unlock, we would have added a
2968                                  * DLM_UNLOCK_FREE_LOCK action. Force it. */
2969                                 dlm_lock_put(lock);
2970                         }
2971                 }
2972                 queue++;
2973         }
2974         bit = 0;
2975         while (1) {
2976                 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2977                 if (bit >= O2NM_MAX_NODES)
2978                         break;
2979                 /* do not clear the local node reference, if there is a
2980                  * process holding this, let it drop the ref itself */
2981                 if (bit != dlm->node_num) {
2982                         mlog(0, "%s:%.*s: node %u had a ref to this "
2983                              "migrating lockres, clearing\n", dlm->name,
2984                              res->lockname.len, res->lockname.name, bit);
2985                         dlm_lockres_clear_refmap_bit(dlm, res, bit);
2986                 }
2987                 bit++;
2988         }
2989 }
2990 
2991 /*
2992  * Pick a node to migrate the lock resource to. This function selects a
2993  * potential target based first on the locks and then on refmap. It skips
2994  * nodes that are in the process of exiting the domain.
2995  */
2996 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2997                                     struct dlm_lock_resource *res)
2998 {
2999         enum dlm_lockres_list idx;
3000         struct list_head *queue = &res->granted;
3001         struct dlm_lock *lock;
3002         int noderef;
3003         u8 nodenum = O2NM_MAX_NODES;
3004 
3005         assert_spin_locked(&dlm->spinlock);
3006         assert_spin_locked(&res->spinlock);
3007 
3008         /* Go through all the locks */
3009         for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
3010                 queue = dlm_list_idx_to_ptr(res, idx);
3011                 list_for_each_entry(lock, queue, list) {
3012                         if (lock->ml.node == dlm->node_num)
3013                                 continue;
3014                         if (test_bit(lock->ml.node, dlm->exit_domain_map))
3015                                 continue;
3016                         nodenum = lock->ml.node;
3017                         goto bail;
3018                 }
3019         }
3020 
3021         /* Go thru the refmap */
3022         noderef = -1;
3023         while (1) {
3024                 noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
3025                                         noderef + 1);
3026                 if (noderef >= O2NM_MAX_NODES)
3027                         break;
3028                 if (noderef == dlm->node_num)
3029                         continue;
3030                 if (test_bit(noderef, dlm->exit_domain_map))
3031                         continue;
3032                 nodenum = noderef;
3033                 goto bail;
3034         }
3035 
3036 bail:
3037         return nodenum;
3038 }
3039 
3040 /* this is called by the new master once all lockres
3041  * data has been received */
3042 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3043                                   struct dlm_lock_resource *res,
3044                                   u8 master, u8 new_master,
3045                                   struct dlm_node_iter *iter)
3046 {
3047         struct dlm_migrate_request migrate;
3048         int ret, skip, status = 0;
3049         int nodenum;
3050 
3051         memset(&migrate, 0, sizeof(migrate));
3052         migrate.namelen = res->lockname.len;
3053         memcpy(migrate.name, res->lockname.name, migrate.namelen);
3054         migrate.new_master = new_master;
3055         migrate.master = master;
3056 
3057         ret = 0;
3058 
3059         /* send message to all nodes, except the master and myself */
3060         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
3061                 if (nodenum == master ||
3062                     nodenum == new_master)
3063                         continue;
3064 
3065                 /* We could race exit domain. If exited, skip. */
3066                 spin_lock(&dlm->spinlock);
3067                 skip = (!test_bit(nodenum, dlm->domain_map));
3068                 spin_unlock(&dlm->spinlock);
3069                 if (skip) {
3070                         clear_bit(nodenum, iter->node_map);
3071                         continue;
3072                 }
3073 
3074                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3075                                          &migrate, sizeof(migrate), nodenum,
3076                                          &status);
3077                 if (ret < 0) {
3078                         mlog(ML_ERROR, "%s: res %.*s, Error %d send "
3079                              "MIGRATE_REQUEST to node %u\n", dlm->name,
3080                              migrate.namelen, migrate.name, ret, nodenum);
3081                         if (!dlm_is_host_down(ret)) {
3082                                 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
3083                                 BUG();
3084                         }
3085                         clear_bit(nodenum, iter->node_map);
3086                         ret = 0;
3087                 } else if (status < 0) {
3088                         mlog(0, "migrate request (node %u) returned %d!\n",
3089                              nodenum, status);
3090                         ret = status;
3091                 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3092                         /* during the migration request we short-circuited
3093                          * the mastery of the lockres.  make sure we have
3094                          * a mastery ref for nodenum */
3095                         mlog(0, "%s:%.*s: need ref for node %u\n",
3096                              dlm->name, res->lockname.len, res->lockname.name,
3097                              nodenum);
3098                         spin_lock(&res->spinlock);
3099                         dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3100                         spin_unlock(&res->spinlock);
3101                 }
3102         }
3103 
3104         if (ret < 0)
3105                 mlog_errno(ret);
3106 
3107         mlog(0, "returning ret=%d\n", ret);
3108         return ret;
3109 }
3110 
3111 
3112 /* if there is an existing mle for this lockres, we now know who the master is.
3113  * (the one who sent us *this* message) we can clear it up right away.
3114  * since the process that put the mle on the list still has a reference to it,
3115  * we can unhash it now, set the master and wake the process.  as a result,
3116  * we will have no mle in the list to start with.  now we can add an mle for
3117  * the migration and this should be the only one found for those scanning the
3118  * list.  */
3119 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3120                                 void **ret_data)
3121 {
3122         struct dlm_ctxt *dlm = data;
3123         struct dlm_lock_resource *res = NULL;
3124         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3125         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3126         const char *name;
3127         unsigned int namelen, hash;
3128         int ret = 0;
3129 
3130         if (!dlm_grab(dlm))
3131                 return 0;
3132 
3133         name = migrate->name;
3134         namelen = migrate->namelen;
3135         hash = dlm_lockid_hash(name, namelen);
3136 
3137         /* preallocate.. if this fails, abort */
3138         mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3139 
3140         if (!mle) {
3141                 ret = -ENOMEM;
3142                 goto leave;
3143         }
3144 
3145         /* check for pre-existing lock */
3146         spin_lock(&dlm->spinlock);
3147         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3148         if (res) {
3149                 spin_lock(&res->spinlock);
3150                 if (res->state & DLM_LOCK_RES_RECOVERING) {
3151                         /* if all is working ok, this can only mean that we got
3152                         * a migrate request from a node that we now see as
3153                         * dead.  what can we do here?  drop it to the floor? */
3154                         spin_unlock(&res->spinlock);
3155                         mlog(ML_ERROR, "Got a migrate request, but the "
3156                              "lockres is marked as recovering!");
3157                         kmem_cache_free(dlm_mle_cache, mle);
3158                         ret = -EINVAL; /* need a better solution */
3159                         goto unlock;
3160                 }
3161                 res->state |= DLM_LOCK_RES_MIGRATING;
3162                 spin_unlock(&res->spinlock);
3163         }
3164 
3165         spin_lock(&dlm->master_lock);
3166         /* ignore status.  only nonzero status would BUG. */
3167         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3168                                     name, namelen,
3169                                     migrate->new_master,
3170                                     migrate->master);
3171 
3172         if (ret < 0)
3173                 kmem_cache_free(dlm_mle_cache, mle);
3174 
3175         spin_unlock(&dlm->master_lock);
3176 unlock:
3177         spin_unlock(&dlm->spinlock);
3178 
3179         if (oldmle) {
3180                 /* master is known, detach if not already detached */
3181                 dlm_mle_detach_hb_events(dlm, oldmle);
3182                 dlm_put_mle(oldmle);
3183         }
3184 
3185         if (res)
3186                 dlm_lockres_put(res);
3187 leave:
3188         dlm_put(dlm);
3189         return ret;
3190 }
3191 
3192 /* must be holding dlm->spinlock and dlm->master_lock
3193  * when adding a migration mle, we can clear any other mles
3194  * in the master list because we know with certainty that
3195  * the master is "master".  so we remove any old mle from
3196  * the list after setting it's master field, and then add
3197  * the new migration mle.  this way we can hold with the rule
3198  * of having only one mle for a given lock name at all times. */
3199 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3200                                  struct dlm_lock_resource *res,
3201                                  struct dlm_master_list_entry *mle,
3202                                  struct dlm_master_list_entry **oldmle,
3203                                  const char *name, unsigned int namelen,
3204                                  u8 new_master, u8 master)
3205 {
3206         int found;
3207         int ret = 0;
3208 
3209         *oldmle = NULL;
3210 
3211         assert_spin_locked(&dlm->spinlock);
3212         assert_spin_locked(&dlm->master_lock);
3213 
3214         /* caller is responsible for any ref taken here on oldmle */
3215         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3216         if (found) {
3217                 struct dlm_master_list_entry *tmp = *oldmle;
3218                 spin_lock(&tmp->spinlock);
3219                 if (tmp->type == DLM_MLE_MIGRATION) {
3220                         if (master == dlm->node_num) {
3221                                 /* ah another process raced me to it */
3222                                 mlog(0, "tried to migrate %.*s, but some "
3223                                      "process beat me to it\n",
3224                                      namelen, name);
3225                                 spin_unlock(&tmp->spinlock);
3226                                 return -EEXIST;
3227                         } else {
3228                                 /* bad.  2 NODES are trying to migrate! */
3229                                 mlog(ML_ERROR, "migration error  mle: "
3230                                      "master=%u new_master=%u // request: "
3231                                      "master=%u new_master=%u // "
3232                                      "lockres=%.*s\n",
3233                                      tmp->master, tmp->new_master,
3234                                      master, new_master,
3235                                      namelen, name);
3236                                 BUG();
3237                         }
3238                 } else {
3239                         /* this is essentially what assert_master does */
3240                         tmp->master = master;
3241                         atomic_set(&tmp->woken, 1);
3242                         wake_up(&tmp->wq);
3243                         /* remove it so that only one mle will be found */
3244                         __dlm_unlink_mle(dlm, tmp);
3245                         __dlm_mle_detach_hb_events(dlm, tmp);
3246                         if (tmp->type == DLM_MLE_MASTER) {
3247                                 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3248                                 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3249                                                 "telling master to get ref "
3250                                                 "for cleared out mle during "
3251                                                 "migration\n", dlm->name,
3252                                                 namelen, name, master,
3253                                                 new_master);
3254                         }
3255                 }
3256                 spin_unlock(&tmp->spinlock);
3257         }
3258 
3259         /* now add a migration mle to the tail of the list */
3260         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3261         mle->new_master = new_master;
3262         /* the new master will be sending an assert master for this.
3263          * at that point we will get the refmap reference */
3264         mle->master = master;
3265         /* do this for consistency with other mle types */
3266         set_bit(new_master, mle->maybe_map);
3267         __dlm_insert_mle(dlm, mle);
3268 
3269         return ret;
3270 }
3271 
3272 /*
3273  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3274  */
3275 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3276                                         struct dlm_master_list_entry *mle)
3277 {
3278         struct dlm_lock_resource *res;
3279 
3280         /* Find the lockres associated to the mle and set its owner to UNK */
3281         res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3282                                    mle->mnamehash);
3283         if (res) {
3284                 spin_unlock(&dlm->master_lock);
3285 
3286                 /* move lockres onto recovery list */
3287                 spin_lock(&res->spinlock);
3288                 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3289                 dlm_move_lockres_to_recovery_list(dlm, res);
3290                 spin_unlock(&res->spinlock);
3291                 dlm_lockres_put(res);
3292 
3293                 /* about to get rid of mle, detach from heartbeat */
3294                 __dlm_mle_detach_hb_events(dlm, mle);
3295 
3296                 /* dump the mle */
3297                 spin_lock(&dlm->master_lock);
3298                 __dlm_put_mle(mle);
3299                 spin_unlock(&dlm->master_lock);
3300         }
3301 
3302         return res;
3303 }
3304 
3305 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3306                                     struct dlm_master_list_entry *mle)
3307 {
3308         __dlm_mle_detach_hb_events(dlm, mle);
3309 
3310         spin_lock(&mle->spinlock);
3311         __dlm_unlink_mle(dlm, mle);
3312         atomic_set(&mle->woken, 1);
3313         spin_unlock(&mle->spinlock);
3314 
3315         wake_up(&mle->wq);
3316 }
3317 
3318 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3319                                 struct dlm_master_list_entry *mle, u8 dead_node)
3320 {
3321         int bit;
3322 
3323         BUG_ON(mle->type != DLM_MLE_BLOCK);
3324 
3325         spin_lock(&mle->spinlock);
3326         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3327         if (bit != dead_node) {
3328                 mlog(0, "mle found, but dead node %u would not have been "
3329                      "master\n", dead_node);
3330                 spin_unlock(&mle->spinlock);
3331         } else {
3332                 /* Must drop the refcount by one since the assert_master will
3333                  * never arrive. This may result in the mle being unlinked and
3334                  * freed, but there may still be a process waiting in the
3335                  * dlmlock path which is fine. */
3336                 mlog(0, "node %u was expected master\n", dead_node);
3337                 atomic_set(&mle->woken, 1);
3338                 spin_unlock(&mle->spinlock);
3339                 wake_up(&mle->wq);
3340 
3341                 /* Do not need events any longer, so detach from heartbeat */
3342                 __dlm_mle_detach_hb_events(dlm, mle);
3343                 __dlm_put_mle(mle);
3344         }
3345 }
3346 
3347 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3348 {
3349         struct dlm_master_list_entry *mle;
3350         struct dlm_lock_resource *res;
3351         struct hlist_head *bucket;
3352         struct hlist_node *tmp;
3353         unsigned int i;
3354 
3355         mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3356 top:
3357         assert_spin_locked(&dlm->spinlock);
3358 
3359         /* clean the master list */
3360         spin_lock(&dlm->master_lock);
3361         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3362                 bucket = dlm_master_hash(dlm, i);
3363                 hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3364                         BUG_ON(mle->type != DLM_MLE_BLOCK &&
3365                                mle->type != DLM_MLE_MASTER &&
3366                                mle->type != DLM_MLE_MIGRATION);
3367 
3368                         /* MASTER mles are initiated locally. The waiting
3369                          * process will notice the node map change shortly.
3370                          * Let that happen as normal. */
3371                         if (mle->type == DLM_MLE_MASTER)
3372                                 continue;
3373 
3374                         /* BLOCK mles are initiated by other nodes. Need to
3375                          * clean up if the dead node would have been the
3376                          * master. */
3377                         if (mle->type == DLM_MLE_BLOCK) {
3378                                 dlm_clean_block_mle(dlm, mle, dead_node);
3379                                 continue;
3380                         }
3381 
3382                         /* Everything else is a MIGRATION mle */
3383 
3384                         /* The rule for MIGRATION mles is that the master
3385                          * becomes UNKNOWN if *either* the original or the new
3386                          * master dies. All UNKNOWN lockres' are sent to
3387                          * whichever node becomes the recovery master. The new
3388                          * master is responsible for determining if there is
3389                          * still a master for this lockres, or if he needs to
3390                          * take over mastery. Either way, this node should
3391                          * expect another message to resolve this. */
3392 
3393                         if (mle->master != dead_node &&
3394                             mle->new_master != dead_node)
3395                                 continue;
3396 
3397                         if (mle->new_master == dead_node && mle->inuse) {
3398                                 mlog(ML_NOTICE, "%s: target %u died during "
3399                                                 "migration from %u, the MLE is "
3400                                                 "still keep used, ignore it!\n",
3401                                                 dlm->name, dead_node,
3402                                                 mle->master);
3403                                 continue;
3404                         }
3405 
3406                         /* If we have reached this point, this mle needs to be
3407                          * removed from the list and freed. */
3408                         dlm_clean_migration_mle(dlm, mle);
3409 
3410                         mlog(0, "%s: node %u died during migration from "
3411                              "%u to %u!\n", dlm->name, dead_node, mle->master,
3412                              mle->new_master);
3413 
3414                         /* If we find a lockres associated with the mle, we've
3415                          * hit this rare case that messes up our lock ordering.
3416                          * If so, we need to drop the master lock so that we can
3417                          * take the lockres lock, meaning that we will have to
3418                          * restart from the head of list. */
3419                         res = dlm_reset_mleres_owner(dlm, mle);
3420                         if (res)
3421                                 /* restart */
3422                                 goto top;
3423 
3424                         /* This may be the last reference */
3425                         __dlm_put_mle(mle);
3426                 }
3427         }
3428         spin_unlock(&dlm->master_lock);
3429 }
3430 
3431 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3432                          u8 old_master)
3433 {
3434         struct dlm_node_iter iter;
3435         int ret = 0;
3436 
3437         spin_lock(&dlm->spinlock);
3438         dlm_node_iter_init(dlm->domain_map, &iter);
3439         clear_bit(old_master, iter.node_map);
3440         clear_bit(dlm->node_num, iter.node_map);
3441         spin_unlock(&dlm->spinlock);
3442 
3443         /* ownership of the lockres is changing.  account for the
3444          * mastery reference here since old_master will briefly have
3445          * a reference after the migration completes */
3446         spin_lock(&res->spinlock);
3447         dlm_lockres_set_refmap_bit(dlm, res, old_master);
3448         spin_unlock(&res->spinlock);
3449 
3450         mlog(0, "now time to do a migrate request to other nodes\n");
3451         ret = dlm_do_migrate_request(dlm, res, old_master,
3452                                      dlm->node_num, &iter);
3453         if (ret < 0) {
3454                 mlog_errno(ret);
3455                 goto leave;
3456         }
3457 
3458         mlog(0, "doing assert master of %.*s to all except the original node\n",
3459              res->lockname.len, res->lockname.name);
3460         /* this call now finishes out the nodemap
3461          * even if one or more nodes die */
3462         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3463                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3464         if (ret < 0) {
3465                 /* no longer need to retry.  all living nodes contacted. */
3466                 mlog_errno(ret);
3467                 ret = 0;
3468         }
3469 
3470         memset(iter.node_map, 0, sizeof(iter.node_map));
3471         set_bit(old_master, iter.node_map);
3472         mlog(0, "doing assert master of %.*s back to %u\n",
3473              res->lockname.len, res->lockname.name, old_master);
3474         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3475                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3476         if (ret < 0) {
3477                 mlog(0, "assert master to original master failed "
3478                      "with %d.\n", ret);
3479                 /* the only nonzero status here would be because of
3480                  * a dead original node.  we're done. */
3481                 ret = 0;
3482         }
3483 
3484         /* all done, set the owner, clear the flag */
3485         spin_lock(&res->spinlock);
3486         dlm_set_lockres_owner(dlm, res, dlm->node_num);
3487         res->state &= ~DLM_LOCK_RES_MIGRATING;
3488         spin_unlock(&res->spinlock);
3489         /* re-dirty it on the new master */
3490         dlm_kick_thread(dlm, res);
3491         wake_up(&res->wq);
3492 leave:
3493         return ret;
3494 }
3495 
3496 /*
3497  * LOCKRES AST REFCOUNT
3498  * this is integral to migration
3499  */
3500 
3501 /* for future intent to call an ast, reserve one ahead of time.
3502  * this should be called only after waiting on the lockres
3503  * with dlm_wait_on_lockres, and while still holding the
3504  * spinlock after the call. */
3505 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3506 {
3507         assert_spin_locked(&res->spinlock);
3508         if (res->state & DLM_LOCK_RES_MIGRATING) {
3509                 __dlm_print_one_lock_resource(res);
3510         }
3511         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3512 
3513         atomic_inc(&res->asts_reserved);
3514 }
3515 
3516 /*
3517  * used to drop the reserved ast, either because it went unused,
3518  * or because the ast/bast was actually called.
3519  *
3520  * also, if there is a pending migration on this lockres,
3521  * and this was the last pending ast on the lockres,
3522  * atomically set the MIGRATING flag before we drop the lock.
3523  * this is how we ensure that migration can proceed with no
3524  * asts in progress.  note that it is ok if the state of the
3525  * queues is such that a lock should be granted in the future
3526  * or that a bast should be fired, because the new master will
3527  * shuffle the lists on this lockres as soon as it is migrated.
3528  */
3529 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3530                              struct dlm_lock_resource *res)
3531 {
3532         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3533                 return;
3534 
3535         if (!res->migration_pending) {
3536                 spin_unlock(&res->spinlock);
3537                 return;
3538         }
3539 
3540         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3541         res->migration_pending = 0;
3542         res->state |= DLM_LOCK_RES_MIGRATING;
3543         spin_unlock(&res->spinlock);
3544         wake_up(&res->wq);
3545         wake_up(&dlm->migration_wq);
3546 }
3547 
3548 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3549 {
3550         int i;
3551         struct hlist_head *bucket;
3552         struct dlm_master_list_entry *mle;
3553         struct hlist_node *tmp;
3554 
3555         /*
3556          * We notified all other nodes that we are exiting the domain and
3557          * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3558          * around we force free them and wake any processes that are waiting
3559          * on the mles
3560          */
3561         spin_lock(&dlm->spinlock);
3562         spin_lock(&dlm->master_lock);
3563 
3564         BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3565         BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3566 
3567         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3568                 bucket = dlm_master_hash(dlm, i);
3569                 hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3570                         if (mle->type != DLM_MLE_BLOCK) {
3571                                 mlog(ML_ERROR, "bad mle: %p\n", mle);
3572                                 dlm_print_one_mle(mle);
3573                         }
3574                         atomic_set(&mle->woken, 1);
3575                         wake_up(&mle->wq);
3576 
3577                         __dlm_unlink_mle(dlm, mle);
3578                         __dlm_mle_detach_hb_events(dlm, mle);
3579                         __dlm_put_mle(mle);
3580                 }
3581         }
3582         spin_unlock(&dlm->master_lock);
3583         spin_unlock(&dlm->spinlock);
3584 }
3585 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp