~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/dlm/lockspace.c

Version: ~ [ linux-5.4-rc7 ] ~ [ linux-5.3.11 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.84 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.154 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.201 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.201 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.19.8 ] ~ [ linux-3.18.140 ] ~ [ linux-3.17.8 ] ~ [ linux-3.16.77 ] ~ [ linux-3.15.10 ] ~ [ linux-3.14.79 ] ~ [ linux-3.13.11 ] ~ [ linux-3.12.74 ] ~ [ linux-3.11.10 ] ~ [ linux-3.10.108 ] ~ [ linux-3.9.11 ] ~ [ linux-3.8.13 ] ~ [ linux-3.7.10 ] ~ [ linux-3.6.11 ] ~ [ linux-3.5.7 ] ~ [ linux-3.4.113 ] ~ [ linux-3.3.8 ] ~ [ linux-3.2.102 ] ~ [ linux-3.1.10 ] ~ [ linux-3.0.101 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 /******************************************************************************
  2 *******************************************************************************
  3 **
  4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  6 **
  7 **  This copyrighted material is made available to anyone wishing to use,
  8 **  modify, copy, or redistribute it subject to the terms and conditions
  9 **  of the GNU General Public License v.2.
 10 **
 11 *******************************************************************************
 12 ******************************************************************************/
 13 
 14 #include <linux/module.h>
 15 
 16 #include "dlm_internal.h"
 17 #include "lockspace.h"
 18 #include "member.h"
 19 #include "recoverd.h"
 20 #include "dir.h"
 21 #include "lowcomms.h"
 22 #include "config.h"
 23 #include "memory.h"
 24 #include "lock.h"
 25 #include "recover.h"
 26 #include "requestqueue.h"
 27 #include "user.h"
 28 #include "ast.h"
 29 
 30 static int                      ls_count;
 31 static struct mutex             ls_lock;
 32 static struct list_head         lslist;
 33 static spinlock_t               lslist_lock;
 34 static struct task_struct *     scand_task;
 35 
 36 
 37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 38 {
 39         ssize_t ret = len;
 40         int n;
 41         int rc = kstrtoint(buf, 0, &n);
 42 
 43         if (rc)
 44                 return rc;
 45         ls = dlm_find_lockspace_local(ls->ls_local_handle);
 46         if (!ls)
 47                 return -EINVAL;
 48 
 49         switch (n) {
 50         case 0:
 51                 dlm_ls_stop(ls);
 52                 break;
 53         case 1:
 54                 dlm_ls_start(ls);
 55                 break;
 56         default:
 57                 ret = -EINVAL;
 58         }
 59         dlm_put_lockspace(ls);
 60         return ret;
 61 }
 62 
 63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 64 {
 65         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 66 
 67         if (rc)
 68                 return rc;
 69         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 70         wake_up(&ls->ls_uevent_wait);
 71         return len;
 72 }
 73 
 74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 75 {
 76         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 77 }
 78 
 79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 80 {
 81         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 82 
 83         if (rc)
 84                 return rc;
 85         return len;
 86 }
 87 
 88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 89 {
 90         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 91 }
 92 
 93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 94 {
 95         int val;
 96         int rc = kstrtoint(buf, 0, &val);
 97 
 98         if (rc)
 99                 return rc;
100         if (val == 1)
101                 set_bit(LSFL_NODIR, &ls->ls_flags);
102         return len;
103 }
104 
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107         uint32_t status = dlm_recover_status(ls);
108         return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110 
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115 
116 struct dlm_attr {
117         struct attribute attr;
118         ssize_t (*show)(struct dlm_ls *, char *);
119         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121 
122 static struct dlm_attr dlm_attr_control = {
123         .attr  = {.name = "control", .mode = S_IWUSR},
124         .store = dlm_control_store
125 };
126 
127 static struct dlm_attr dlm_attr_event = {
128         .attr  = {.name = "event_done", .mode = S_IWUSR},
129         .store = dlm_event_store
130 };
131 
132 static struct dlm_attr dlm_attr_id = {
133         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134         .show  = dlm_id_show,
135         .store = dlm_id_store
136 };
137 
138 static struct dlm_attr dlm_attr_nodir = {
139         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140         .show  = dlm_nodir_show,
141         .store = dlm_nodir_store
142 };
143 
144 static struct dlm_attr dlm_attr_recover_status = {
145         .attr  = {.name = "recover_status", .mode = S_IRUGO},
146         .show  = dlm_recover_status_show
147 };
148 
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151         .show  = dlm_recover_nodeid_show
152 };
153 
154 static struct attribute *dlm_attrs[] = {
155         &dlm_attr_control.attr,
156         &dlm_attr_event.attr,
157         &dlm_attr_id.attr,
158         &dlm_attr_nodir.attr,
159         &dlm_attr_recover_status.attr,
160         &dlm_attr_recover_nodeid.attr,
161         NULL,
162 };
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192         .default_attrs = dlm_attrs,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         int error;
202 
203         if (in)
204                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205         else
206                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207 
208         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209 
210         /* dlm_controld will see the uevent, do the necessary group management
211            and then write to sysfs to wake us */
212 
213         error = wait_event_interruptible(ls->ls_uevent_wait,
214                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215 
216         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217 
218         if (error)
219                 goto out;
220 
221         error = ls->ls_uevent_result;
222  out:
223         if (error)
224                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225                           error, ls->ls_uevent_result);
226         return error;
227 }
228 
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230                       struct kobj_uevent_env *env)
231 {
232         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233 
234         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235         return 0;
236 }
237 
238 static const struct kset_uevent_ops dlm_uevent_ops = {
239         .uevent = dlm_uevent,
240 };
241 
242 int __init dlm_lockspace_init(void)
243 {
244         ls_count = 0;
245         mutex_init(&ls_lock);
246         INIT_LIST_HEAD(&lslist);
247         spin_lock_init(&lslist_lock);
248 
249         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250         if (!dlm_kset) {
251                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
252                 return -ENOMEM;
253         }
254         return 0;
255 }
256 
257 void dlm_lockspace_exit(void)
258 {
259         kset_unregister(dlm_kset);
260 }
261 
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264         struct dlm_ls *ls;
265 
266         spin_lock(&lslist_lock);
267         list_for_each_entry(ls, &lslist, ls_list) {
268                 if (time_after_eq(jiffies, ls->ls_scan_time +
269                                             dlm_config.ci_scan_secs * HZ)) {
270                         spin_unlock(&lslist_lock);
271                         return ls;
272                 }
273         }
274         spin_unlock(&lslist_lock);
275         return NULL;
276 }
277 
278 static int dlm_scand(void *data)
279 {
280         struct dlm_ls *ls;
281 
282         while (!kthread_should_stop()) {
283                 ls = find_ls_to_scan();
284                 if (ls) {
285                         if (dlm_lock_recovery_try(ls)) {
286                                 ls->ls_scan_time = jiffies;
287                                 dlm_scan_rsbs(ls);
288                                 dlm_scan_timeout(ls);
289                                 dlm_scan_waiters(ls);
290                                 dlm_unlock_recovery(ls);
291                         } else {
292                                 ls->ls_scan_time += HZ;
293                         }
294                         continue;
295                 }
296                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297         }
298         return 0;
299 }
300 
301 static int dlm_scand_start(void)
302 {
303         struct task_struct *p;
304         int error = 0;
305 
306         p = kthread_run(dlm_scand, NULL, "dlm_scand");
307         if (IS_ERR(p))
308                 error = PTR_ERR(p);
309         else
310                 scand_task = p;
311         return error;
312 }
313 
314 static void dlm_scand_stop(void)
315 {
316         kthread_stop(scand_task);
317 }
318 
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321         struct dlm_ls *ls;
322 
323         spin_lock(&lslist_lock);
324 
325         list_for_each_entry(ls, &lslist, ls_list) {
326                 if (ls->ls_global_id == id) {
327                         ls->ls_count++;
328                         goto out;
329                 }
330         }
331         ls = NULL;
332  out:
333         spin_unlock(&lslist_lock);
334         return ls;
335 }
336 
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339         struct dlm_ls *ls;
340 
341         spin_lock(&lslist_lock);
342         list_for_each_entry(ls, &lslist, ls_list) {
343                 if (ls->ls_local_handle == lockspace) {
344                         ls->ls_count++;
345                         goto out;
346                 }
347         }
348         ls = NULL;
349  out:
350         spin_unlock(&lslist_lock);
351         return ls;
352 }
353 
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356         struct dlm_ls *ls;
357 
358         spin_lock(&lslist_lock);
359         list_for_each_entry(ls, &lslist, ls_list) {
360                 if (ls->ls_device.minor == minor) {
361                         ls->ls_count++;
362                         goto out;
363                 }
364         }
365         ls = NULL;
366  out:
367         spin_unlock(&lslist_lock);
368         return ls;
369 }
370 
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373         spin_lock(&lslist_lock);
374         ls->ls_count--;
375         spin_unlock(&lslist_lock);
376 }
377 
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380         for (;;) {
381                 spin_lock(&lslist_lock);
382                 if (ls->ls_count == 0) {
383                         WARN_ON(ls->ls_create_count != 0);
384                         list_del(&ls->ls_list);
385                         spin_unlock(&lslist_lock);
386                         return;
387                 }
388                 spin_unlock(&lslist_lock);
389                 ssleep(1);
390         }
391 }
392 
393 static int threads_start(void)
394 {
395         int error;
396 
397         error = dlm_scand_start();
398         if (error) {
399                 log_print("cannot start dlm_scand thread %d", error);
400                 goto fail;
401         }
402 
403         /* Thread for sending/receiving messages for all lockspace's */
404         error = dlm_lowcomms_start();
405         if (error) {
406                 log_print("cannot start dlm lowcomms %d", error);
407                 goto scand_fail;
408         }
409 
410         return 0;
411 
412  scand_fail:
413         dlm_scand_stop();
414  fail:
415         return error;
416 }
417 
418 static void threads_stop(void)
419 {
420         dlm_scand_stop();
421         dlm_lowcomms_stop();
422 }
423 
424 static int new_lockspace(const char *name, const char *cluster,
425                          uint32_t flags, int lvblen,
426                          const struct dlm_lockspace_ops *ops, void *ops_arg,
427                          int *ops_result, dlm_lockspace_t **lockspace)
428 {
429         struct dlm_ls *ls;
430         int i, size, error;
431         int do_unreg = 0;
432         int namelen = strlen(name);
433 
434         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
435                 return -EINVAL;
436 
437         if (!lvblen || (lvblen % 8))
438                 return -EINVAL;
439 
440         if (!try_module_get(THIS_MODULE))
441                 return -EINVAL;
442 
443         if (!dlm_user_daemon_available()) {
444                 log_print("dlm user daemon not available");
445                 error = -EUNATCH;
446                 goto out;
447         }
448 
449         if (ops && ops_result) {
450                 if (!dlm_config.ci_recover_callbacks)
451                         *ops_result = -EOPNOTSUPP;
452                 else
453                         *ops_result = 0;
454         }
455 
456         if (!cluster)
457                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
458                           dlm_config.ci_cluster_name);
459 
460         if (dlm_config.ci_recover_callbacks && cluster &&
461             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
462                 log_print("dlm cluster name '%s' does not match "
463                           "the application cluster name '%s'",
464                           dlm_config.ci_cluster_name, cluster);
465                 error = -EBADR;
466                 goto out;
467         }
468 
469         error = 0;
470 
471         spin_lock(&lslist_lock);
472         list_for_each_entry(ls, &lslist, ls_list) {
473                 WARN_ON(ls->ls_create_count <= 0);
474                 if (ls->ls_namelen != namelen)
475                         continue;
476                 if (memcmp(ls->ls_name, name, namelen))
477                         continue;
478                 if (flags & DLM_LSFL_NEWEXCL) {
479                         error = -EEXIST;
480                         break;
481                 }
482                 ls->ls_create_count++;
483                 *lockspace = ls;
484                 error = 1;
485                 break;
486         }
487         spin_unlock(&lslist_lock);
488 
489         if (error)
490                 goto out;
491 
492         error = -ENOMEM;
493 
494         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
495         if (!ls)
496                 goto out;
497         memcpy(ls->ls_name, name, namelen);
498         ls->ls_namelen = namelen;
499         ls->ls_lvblen = lvblen;
500         ls->ls_count = 0;
501         ls->ls_flags = 0;
502         ls->ls_scan_time = jiffies;
503 
504         if (ops && dlm_config.ci_recover_callbacks) {
505                 ls->ls_ops = ops;
506                 ls->ls_ops_arg = ops_arg;
507         }
508 
509         if (flags & DLM_LSFL_TIMEWARN)
510                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
511 
512         /* ls_exflags are forced to match among nodes, and we don't
513            need to require all nodes to have some flags set */
514         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
515                                     DLM_LSFL_NEWEXCL));
516 
517         size = dlm_config.ci_rsbtbl_size;
518         ls->ls_rsbtbl_size = size;
519 
520         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
521         if (!ls->ls_rsbtbl)
522                 goto out_lsfree;
523         for (i = 0; i < size; i++) {
524                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
525                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
526                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
527         }
528 
529         spin_lock_init(&ls->ls_remove_spin);
530 
531         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
532                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
533                                                  GFP_KERNEL);
534                 if (!ls->ls_remove_names[i])
535                         goto out_rsbtbl;
536         }
537 
538         idr_init(&ls->ls_lkbidr);
539         spin_lock_init(&ls->ls_lkbidr_spin);
540 
541         INIT_LIST_HEAD(&ls->ls_waiters);
542         mutex_init(&ls->ls_waiters_mutex);
543         INIT_LIST_HEAD(&ls->ls_orphans);
544         mutex_init(&ls->ls_orphans_mutex);
545         INIT_LIST_HEAD(&ls->ls_timeout);
546         mutex_init(&ls->ls_timeout_mutex);
547 
548         INIT_LIST_HEAD(&ls->ls_new_rsb);
549         spin_lock_init(&ls->ls_new_rsb_spin);
550 
551         INIT_LIST_HEAD(&ls->ls_nodes);
552         INIT_LIST_HEAD(&ls->ls_nodes_gone);
553         ls->ls_num_nodes = 0;
554         ls->ls_low_nodeid = 0;
555         ls->ls_total_weight = 0;
556         ls->ls_node_array = NULL;
557 
558         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559         ls->ls_stub_rsb.res_ls = ls;
560 
561         ls->ls_debug_rsb_dentry = NULL;
562         ls->ls_debug_waiters_dentry = NULL;
563 
564         init_waitqueue_head(&ls->ls_uevent_wait);
565         ls->ls_uevent_result = 0;
566         init_completion(&ls->ls_members_done);
567         ls->ls_members_result = -1;
568 
569         mutex_init(&ls->ls_cb_mutex);
570         INIT_LIST_HEAD(&ls->ls_cb_delay);
571 
572         ls->ls_recoverd_task = NULL;
573         mutex_init(&ls->ls_recoverd_active);
574         spin_lock_init(&ls->ls_recover_lock);
575         spin_lock_init(&ls->ls_rcom_spin);
576         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577         ls->ls_recover_status = 0;
578         ls->ls_recover_seq = 0;
579         ls->ls_recover_args = NULL;
580         init_rwsem(&ls->ls_in_recovery);
581         init_rwsem(&ls->ls_recv_active);
582         INIT_LIST_HEAD(&ls->ls_requestqueue);
583         mutex_init(&ls->ls_requestqueue_mutex);
584         mutex_init(&ls->ls_clear_proc_locks);
585 
586         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
587         if (!ls->ls_recover_buf)
588                 goto out_lkbidr;
589 
590         ls->ls_slot = 0;
591         ls->ls_num_slots = 0;
592         ls->ls_slots_size = 0;
593         ls->ls_slots = NULL;
594 
595         INIT_LIST_HEAD(&ls->ls_recover_list);
596         spin_lock_init(&ls->ls_recover_list_lock);
597         idr_init(&ls->ls_recover_idr);
598         spin_lock_init(&ls->ls_recover_idr_lock);
599         ls->ls_recover_list_count = 0;
600         ls->ls_local_handle = ls;
601         init_waitqueue_head(&ls->ls_wait_general);
602         INIT_LIST_HEAD(&ls->ls_root_list);
603         init_rwsem(&ls->ls_root_sem);
604 
605         spin_lock(&lslist_lock);
606         ls->ls_create_count = 1;
607         list_add(&ls->ls_list, &lslist);
608         spin_unlock(&lslist_lock);
609 
610         if (flags & DLM_LSFL_FS) {
611                 error = dlm_callback_start(ls);
612                 if (error) {
613                         log_error(ls, "can't start dlm_callback %d", error);
614                         goto out_delist;
615                 }
616         }
617 
618         init_waitqueue_head(&ls->ls_recover_lock_wait);
619 
620         /*
621          * Once started, dlm_recoverd first looks for ls in lslist, then
622          * initializes ls_in_recovery as locked in "down" mode.  We need
623          * to wait for the wakeup from dlm_recoverd because in_recovery
624          * has to start out in down mode.
625          */
626 
627         error = dlm_recoverd_start(ls);
628         if (error) {
629                 log_error(ls, "can't start dlm_recoverd %d", error);
630                 goto out_callback;
631         }
632 
633         wait_event(ls->ls_recover_lock_wait,
634                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
635 
636         ls->ls_kobj.kset = dlm_kset;
637         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
638                                      "%s", ls->ls_name);
639         if (error)
640                 goto out_recoverd;
641         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
642 
643         /* let kobject handle freeing of ls if there's an error */
644         do_unreg = 1;
645 
646         /* This uevent triggers dlm_controld in userspace to add us to the
647            group of nodes that are members of this lockspace (managed by the
648            cluster infrastructure.)  Once it's done that, it tells us who the
649            current lockspace members are (via configfs) and then tells the
650            lockspace to start running (via sysfs) in dlm_ls_start(). */
651 
652         error = do_uevent(ls, 1);
653         if (error)
654                 goto out_recoverd;
655 
656         wait_for_completion(&ls->ls_members_done);
657         error = ls->ls_members_result;
658         if (error)
659                 goto out_members;
660 
661         dlm_create_debug_file(ls);
662 
663         log_rinfo(ls, "join complete");
664         *lockspace = ls;
665         return 0;
666 
667  out_members:
668         do_uevent(ls, 0);
669         dlm_clear_members(ls);
670         kfree(ls->ls_node_array);
671  out_recoverd:
672         dlm_recoverd_stop(ls);
673  out_callback:
674         dlm_callback_stop(ls);
675  out_delist:
676         spin_lock(&lslist_lock);
677         list_del(&ls->ls_list);
678         spin_unlock(&lslist_lock);
679         idr_destroy(&ls->ls_recover_idr);
680         kfree(ls->ls_recover_buf);
681  out_lkbidr:
682         idr_destroy(&ls->ls_lkbidr);
683  out_rsbtbl:
684         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
685                 kfree(ls->ls_remove_names[i]);
686         vfree(ls->ls_rsbtbl);
687  out_lsfree:
688         if (do_unreg)
689                 kobject_put(&ls->ls_kobj);
690         else
691                 kfree(ls);
692  out:
693         module_put(THIS_MODULE);
694         return error;
695 }
696 
697 int dlm_new_lockspace(const char *name, const char *cluster,
698                       uint32_t flags, int lvblen,
699                       const struct dlm_lockspace_ops *ops, void *ops_arg,
700                       int *ops_result, dlm_lockspace_t **lockspace)
701 {
702         int error = 0;
703 
704         mutex_lock(&ls_lock);
705         if (!ls_count)
706                 error = threads_start();
707         if (error)
708                 goto out;
709 
710         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
711                               ops_result, lockspace);
712         if (!error)
713                 ls_count++;
714         if (error > 0)
715                 error = 0;
716         if (!ls_count)
717                 threads_stop();
718  out:
719         mutex_unlock(&ls_lock);
720         return error;
721 }
722 
723 static int lkb_idr_is_local(int id, void *p, void *data)
724 {
725         struct dlm_lkb *lkb = p;
726 
727         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
728 }
729 
730 static int lkb_idr_is_any(int id, void *p, void *data)
731 {
732         return 1;
733 }
734 
735 static int lkb_idr_free(int id, void *p, void *data)
736 {
737         struct dlm_lkb *lkb = p;
738 
739         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
740                 dlm_free_lvb(lkb->lkb_lvbptr);
741 
742         dlm_free_lkb(lkb);
743         return 0;
744 }
745 
746 /* NOTE: We check the lkbidr here rather than the resource table.
747    This is because there may be LKBs queued as ASTs that have been unlinked
748    from their RSBs and are pending deletion once the AST has been delivered */
749 
750 static int lockspace_busy(struct dlm_ls *ls, int force)
751 {
752         int rv;
753 
754         spin_lock(&ls->ls_lkbidr_spin);
755         if (force == 0) {
756                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
757         } else if (force == 1) {
758                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
759         } else {
760                 rv = 0;
761         }
762         spin_unlock(&ls->ls_lkbidr_spin);
763         return rv;
764 }
765 
766 static int release_lockspace(struct dlm_ls *ls, int force)
767 {
768         struct dlm_rsb *rsb;
769         struct rb_node *n;
770         int i, busy, rv;
771 
772         busy = lockspace_busy(ls, force);
773 
774         spin_lock(&lslist_lock);
775         if (ls->ls_create_count == 1) {
776                 if (busy) {
777                         rv = -EBUSY;
778                 } else {
779                         /* remove_lockspace takes ls off lslist */
780                         ls->ls_create_count = 0;
781                         rv = 0;
782                 }
783         } else if (ls->ls_create_count > 1) {
784                 rv = --ls->ls_create_count;
785         } else {
786                 rv = -EINVAL;
787         }
788         spin_unlock(&lslist_lock);
789 
790         if (rv) {
791                 log_debug(ls, "release_lockspace no remove %d", rv);
792                 return rv;
793         }
794 
795         dlm_device_deregister(ls);
796 
797         if (force < 3 && dlm_user_daemon_available())
798                 do_uevent(ls, 0);
799 
800         dlm_recoverd_stop(ls);
801 
802         dlm_callback_stop(ls);
803 
804         remove_lockspace(ls);
805 
806         dlm_delete_debug_file(ls);
807 
808         idr_destroy(&ls->ls_recover_idr);
809         kfree(ls->ls_recover_buf);
810 
811         /*
812          * Free all lkb's in idr
813          */
814 
815         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
816         idr_destroy(&ls->ls_lkbidr);
817 
818         /*
819          * Free all rsb's on rsbtbl[] lists
820          */
821 
822         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
823                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
824                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
825                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
826                         dlm_free_rsb(rsb);
827                 }
828 
829                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
830                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
831                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
832                         dlm_free_rsb(rsb);
833                 }
834         }
835 
836         vfree(ls->ls_rsbtbl);
837 
838         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
839                 kfree(ls->ls_remove_names[i]);
840 
841         while (!list_empty(&ls->ls_new_rsb)) {
842                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
843                                        res_hashchain);
844                 list_del(&rsb->res_hashchain);
845                 dlm_free_rsb(rsb);
846         }
847 
848         /*
849          * Free structures on any other lists
850          */
851 
852         dlm_purge_requestqueue(ls);
853         kfree(ls->ls_recover_args);
854         dlm_clear_members(ls);
855         dlm_clear_members_gone(ls);
856         kfree(ls->ls_node_array);
857         log_rinfo(ls, "release_lockspace final free");
858         kobject_put(&ls->ls_kobj);
859         /* The ls structure will be freed when the kobject is done with */
860 
861         module_put(THIS_MODULE);
862         return 0;
863 }
864 
865 /*
866  * Called when a system has released all its locks and is not going to use the
867  * lockspace any longer.  We free everything we're managing for this lockspace.
868  * Remaining nodes will go through the recovery process as if we'd died.  The
869  * lockspace must continue to function as usual, participating in recoveries,
870  * until this returns.
871  *
872  * Force has 4 possible values:
873  * 0 - don't destroy locksapce if it has any LKBs
874  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
875  * 2 - destroy lockspace regardless of LKBs
876  * 3 - destroy lockspace as part of a forced shutdown
877  */
878 
879 int dlm_release_lockspace(void *lockspace, int force)
880 {
881         struct dlm_ls *ls;
882         int error;
883 
884         ls = dlm_find_lockspace_local(lockspace);
885         if (!ls)
886                 return -EINVAL;
887         dlm_put_lockspace(ls);
888 
889         mutex_lock(&ls_lock);
890         error = release_lockspace(ls, force);
891         if (!error)
892                 ls_count--;
893         if (!ls_count)
894                 threads_stop();
895         mutex_unlock(&ls_lock);
896 
897         return error;
898 }
899 
900 void dlm_stop_lockspaces(void)
901 {
902         struct dlm_ls *ls;
903         int count;
904 
905  restart:
906         count = 0;
907         spin_lock(&lslist_lock);
908         list_for_each_entry(ls, &lslist, ls_list) {
909                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
910                         count++;
911                         continue;
912                 }
913                 spin_unlock(&lslist_lock);
914                 log_error(ls, "no userland control daemon, stopping lockspace");
915                 dlm_ls_stop(ls);
916                 goto restart;
917         }
918         spin_unlock(&lslist_lock);
919 
920         if (count)
921                 log_print("dlm user daemon left %d lockspaces", count);
922 }
923 
924 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp