~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/ocfs2/cluster/heartbeat.c

Version: ~ [ linux-5.2-rc1 ] ~ [ linux-5.1.2 ] ~ [ linux-5.0.16 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.43 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.119 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.176 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.179 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.19.8 ] ~ [ linux-3.18.139 ] ~ [ linux-3.17.8 ] ~ [ linux-3.16.67 ] ~ [ linux-3.15.10 ] ~ [ linux-3.14.79 ] ~ [ linux-3.13.11 ] ~ [ linux-3.12.74 ] ~ [ linux-3.11.10 ] ~ [ linux-3.10.108 ] ~ [ linux-3.9.11 ] ~ [ linux-3.8.13 ] ~ [ linux-3.7.10 ] ~ [ linux-3.6.11 ] ~ [ linux-3.5.7 ] ~ [ linux-3.4.113 ] ~ [ linux-3.3.8 ] ~ [ linux-3.2.102 ] ~ [ linux-3.1.10 ] ~ [ linux-3.0.101 ] ~ [ linux-2.6.39.4 ] ~ [ linux-2.6.38.8 ] ~ [ linux-2.6.37.6 ] ~ [ linux-2.6.36.4 ] ~ [ linux-2.6.35.14 ] ~ [ linux-2.6.34.15 ] ~ [ linux-2.6.33.20 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 /* -*- mode: c; c-basic-offset: 8; -*-
  2  * vim: noexpandtab sw=8 ts=8 sts=0:
  3  *
  4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
  5  *
  6  * This program is free software; you can redistribute it and/or
  7  * modify it under the terms of the GNU General Public
  8  * License as published by the Free Software Foundation; either
  9  * version 2 of the License, or (at your option) any later version.
 10  *
 11  * This program is distributed in the hope that it will be useful,
 12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 14  * General Public License for more details.
 15  *
 16  * You should have received a copy of the GNU General Public
 17  * License along with this program; if not, write to the
 18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 19  * Boston, MA 021110-1307, USA.
 20  */
 21 
 22 #include <linux/kernel.h>
 23 #include <linux/sched.h>
 24 #include <linux/jiffies.h>
 25 #include <linux/module.h>
 26 #include <linux/fs.h>
 27 #include <linux/bio.h>
 28 #include <linux/blkdev.h>
 29 #include <linux/delay.h>
 30 #include <linux/file.h>
 31 #include <linux/kthread.h>
 32 #include <linux/configfs.h>
 33 #include <linux/random.h>
 34 #include <linux/crc32.h>
 35 #include <linux/time.h>
 36 #include <linux/debugfs.h>
 37 #include <linux/slab.h>
 38 #include <linux/bitmap.h>
 39 #include <linux/ktime.h>
 40 #include "heartbeat.h"
 41 #include "tcp.h"
 42 #include "nodemanager.h"
 43 #include "quorum.h"
 44 
 45 #include "masklog.h"
 46 
 47 
 48 /*
 49  * The first heartbeat pass had one global thread that would serialize all hb
 50  * callback calls.  This global serializing sem should only be removed once
 51  * we've made sure that all callees can deal with being called concurrently
 52  * from multiple hb region threads.
 53  */
 54 static DECLARE_RWSEM(o2hb_callback_sem);
 55 
 56 /*
 57  * multiple hb threads are watching multiple regions.  A node is live
 58  * whenever any of the threads sees activity from the node in its region.
 59  */
 60 static DEFINE_SPINLOCK(o2hb_live_lock);
 61 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
 62 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 63 static LIST_HEAD(o2hb_node_events);
 64 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 65 
 66 /*
 67  * In global heartbeat, we maintain a series of region bitmaps.
 68  *      - o2hb_region_bitmap allows us to limit the region number to max region.
 69  *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
 70  *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
 71  *              heartbeat on it.
 72  *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
 73  */
 74 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 75 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 76 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 77 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 78 
 79 #define O2HB_DB_TYPE_LIVENODES          0
 80 #define O2HB_DB_TYPE_LIVEREGIONS        1
 81 #define O2HB_DB_TYPE_QUORUMREGIONS      2
 82 #define O2HB_DB_TYPE_FAILEDREGIONS      3
 83 #define O2HB_DB_TYPE_REGION_LIVENODES   4
 84 #define O2HB_DB_TYPE_REGION_NUMBER      5
 85 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
 86 #define O2HB_DB_TYPE_REGION_PINNED      7
 87 struct o2hb_debug_buf {
 88         int db_type;
 89         int db_size;
 90         int db_len;
 91         void *db_data;
 92 };
 93 
 94 static struct o2hb_debug_buf *o2hb_db_livenodes;
 95 static struct o2hb_debug_buf *o2hb_db_liveregions;
 96 static struct o2hb_debug_buf *o2hb_db_quorumregions;
 97 static struct o2hb_debug_buf *o2hb_db_failedregions;
 98 
 99 #define O2HB_DEBUG_DIR                  "o2hb"
100 #define O2HB_DEBUG_LIVENODES            "livenodes"
101 #define O2HB_DEBUG_LIVEREGIONS          "live_regions"
102 #define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
103 #define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
104 #define O2HB_DEBUG_REGION_NUMBER        "num"
105 #define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
106 #define O2HB_DEBUG_REGION_PINNED        "pinned"
107 
108 static struct dentry *o2hb_debug_dir;
109 static struct dentry *o2hb_debug_livenodes;
110 static struct dentry *o2hb_debug_liveregions;
111 static struct dentry *o2hb_debug_quorumregions;
112 static struct dentry *o2hb_debug_failedregions;
113 
114 static LIST_HEAD(o2hb_all_regions);
115 
116 static struct o2hb_callback {
117         struct list_head list;
118 } o2hb_callbacks[O2HB_NUM_CB];
119 
120 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
121 
122 #define O2HB_DEFAULT_BLOCK_BITS       9
123 
124 enum o2hb_heartbeat_modes {
125         O2HB_HEARTBEAT_LOCAL            = 0,
126         O2HB_HEARTBEAT_GLOBAL,
127         O2HB_HEARTBEAT_NUM_MODES,
128 };
129 
130 char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
131                 "local",        /* O2HB_HEARTBEAT_LOCAL */
132                 "global",       /* O2HB_HEARTBEAT_GLOBAL */
133 };
134 
135 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
136 unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
137 
138 /*
139  * o2hb_dependent_users tracks the number of registered callbacks that depend
140  * on heartbeat. o2net and o2dlm are two entities that register this callback.
141  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
142  * to stop while a dlm domain is still active.
143  */
144 unsigned int o2hb_dependent_users;
145 
146 /*
147  * In global heartbeat mode, all regions are pinned if there are one or more
148  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
149  * regions are unpinned if the region count exceeds the cut off or the number
150  * of dependent users falls to zero.
151  */
152 #define O2HB_PIN_CUT_OFF                3
153 
154 /*
155  * In local heartbeat mode, we assume the dlm domain name to be the same as
156  * region uuid. This is true for domains created for the file system but not
157  * necessarily true for userdlm domains. This is a known limitation.
158  *
159  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
160  * works for both file system and userdlm domains.
161  */
162 static int o2hb_region_pin(const char *region_uuid);
163 static void o2hb_region_unpin(const char *region_uuid);
164 
165 /* Only sets a new threshold if there are no active regions.
166  *
167  * No locking or otherwise interesting code is required for reading
168  * o2hb_dead_threshold as it can't change once regions are active and
169  * it's not interesting to anyone until then anyway. */
170 static void o2hb_dead_threshold_set(unsigned int threshold)
171 {
172         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
173                 spin_lock(&o2hb_live_lock);
174                 if (list_empty(&o2hb_all_regions))
175                         o2hb_dead_threshold = threshold;
176                 spin_unlock(&o2hb_live_lock);
177         }
178 }
179 
180 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
181 {
182         int ret = -1;
183 
184         if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
185                 spin_lock(&o2hb_live_lock);
186                 if (list_empty(&o2hb_all_regions)) {
187                         o2hb_heartbeat_mode = hb_mode;
188                         ret = 0;
189                 }
190                 spin_unlock(&o2hb_live_lock);
191         }
192 
193         return ret;
194 }
195 
196 struct o2hb_node_event {
197         struct list_head        hn_item;
198         enum o2hb_callback_type hn_event_type;
199         struct o2nm_node        *hn_node;
200         int                     hn_node_num;
201 };
202 
203 struct o2hb_disk_slot {
204         struct o2hb_disk_heartbeat_block *ds_raw_block;
205         u8                      ds_node_num;
206         u64                     ds_last_time;
207         u64                     ds_last_generation;
208         u16                     ds_equal_samples;
209         u16                     ds_changed_samples;
210         struct list_head        ds_live_item;
211 };
212 
213 /* each thread owns a region.. when we're asked to tear down the region
214  * we ask the thread to stop, who cleans up the region */
215 struct o2hb_region {
216         struct config_item      hr_item;
217 
218         struct list_head        hr_all_item;
219         unsigned                hr_unclean_stop:1,
220                                 hr_aborted_start:1,
221                                 hr_item_pinned:1,
222                                 hr_item_dropped:1,
223                                 hr_node_deleted:1;
224 
225         /* protected by the hr_callback_sem */
226         struct task_struct      *hr_task;
227 
228         unsigned int            hr_blocks;
229         unsigned long long      hr_start_block;
230 
231         unsigned int            hr_block_bits;
232         unsigned int            hr_block_bytes;
233 
234         unsigned int            hr_slots_per_page;
235         unsigned int            hr_num_pages;
236 
237         struct page             **hr_slot_data;
238         struct block_device     *hr_bdev;
239         struct o2hb_disk_slot   *hr_slots;
240 
241         /* live node map of this region */
242         unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
243         unsigned int            hr_region_num;
244 
245         struct dentry           *hr_debug_dir;
246         struct dentry           *hr_debug_livenodes;
247         struct dentry           *hr_debug_regnum;
248         struct dentry           *hr_debug_elapsed_time;
249         struct dentry           *hr_debug_pinned;
250         struct o2hb_debug_buf   *hr_db_livenodes;
251         struct o2hb_debug_buf   *hr_db_regnum;
252         struct o2hb_debug_buf   *hr_db_elapsed_time;
253         struct o2hb_debug_buf   *hr_db_pinned;
254 
255         /* let the person setting up hb wait for it to return until it
256          * has reached a 'steady' state.  This will be fixed when we have
257          * a more complete api that doesn't lead to this sort of fragility. */
258         atomic_t                hr_steady_iterations;
259 
260         /* terminate o2hb thread if it does not reach steady state
261          * (hr_steady_iterations == 0) within hr_unsteady_iterations */
262         atomic_t                hr_unsteady_iterations;
263 
264         char                    hr_dev_name[BDEVNAME_SIZE];
265 
266         unsigned int            hr_timeout_ms;
267 
268         /* randomized as the region goes up and down so that a node
269          * recognizes a node going up and down in one iteration */
270         u64                     hr_generation;
271 
272         struct delayed_work     hr_write_timeout_work;
273         unsigned long           hr_last_timeout_start;
274 
275         /* negotiate timer, used to negotiate extending hb timeout. */
276         struct delayed_work     hr_nego_timeout_work;
277         unsigned long           hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
278 
279         /* Used during o2hb_check_slot to hold a copy of the block
280          * being checked because we temporarily have to zero out the
281          * crc field. */
282         struct o2hb_disk_heartbeat_block *hr_tmp_block;
283 
284         /* Message key for negotiate timeout message. */
285         unsigned int            hr_key;
286         struct list_head        hr_handler_list;
287 
288         /* last hb status, 0 for success, other value for error. */
289         int                     hr_last_hb_status;
290 };
291 
292 struct o2hb_bio_wait_ctxt {
293         atomic_t          wc_num_reqs;
294         struct completion wc_io_complete;
295         int               wc_error;
296 };
297 
298 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
299 
300 enum {
301         O2HB_NEGO_TIMEOUT_MSG = 1,
302         O2HB_NEGO_APPROVE_MSG = 2,
303 };
304 
305 struct o2hb_nego_msg {
306         u8 node_num;
307 };
308 
309 static void o2hb_write_timeout(struct work_struct *work)
310 {
311         int failed, quorum;
312         struct o2hb_region *reg =
313                 container_of(work, struct o2hb_region,
314                              hr_write_timeout_work.work);
315 
316         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
317              "milliseconds\n", reg->hr_dev_name,
318              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
319 
320         if (o2hb_global_heartbeat_active()) {
321                 spin_lock(&o2hb_live_lock);
322                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
323                         set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
324                 failed = bitmap_weight(o2hb_failed_region_bitmap,
325                                         O2NM_MAX_REGIONS);
326                 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
327                                         O2NM_MAX_REGIONS);
328                 spin_unlock(&o2hb_live_lock);
329 
330                 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
331                      quorum, failed);
332 
333                 /*
334                  * Fence if the number of failed regions >= half the number
335                  * of  quorum regions
336                  */
337                 if ((failed << 1) < quorum)
338                         return;
339         }
340 
341         o2quo_disk_timeout();
342 }
343 
344 static void o2hb_arm_timeout(struct o2hb_region *reg)
345 {
346         /* Arm writeout only after thread reaches steady state */
347         if (atomic_read(&reg->hr_steady_iterations) != 0)
348                 return;
349 
350         mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
351              O2HB_MAX_WRITE_TIMEOUT_MS);
352 
353         if (o2hb_global_heartbeat_active()) {
354                 spin_lock(&o2hb_live_lock);
355                 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
356                 spin_unlock(&o2hb_live_lock);
357         }
358         cancel_delayed_work(&reg->hr_write_timeout_work);
359         schedule_delayed_work(&reg->hr_write_timeout_work,
360                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
361 
362         cancel_delayed_work(&reg->hr_nego_timeout_work);
363         /* negotiate timeout must be less than write timeout. */
364         schedule_delayed_work(&reg->hr_nego_timeout_work,
365                               msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
366         memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
367 }
368 
369 static void o2hb_disarm_timeout(struct o2hb_region *reg)
370 {
371         cancel_delayed_work_sync(&reg->hr_write_timeout_work);
372         cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
373 }
374 
375 static int o2hb_send_nego_msg(int key, int type, u8 target)
376 {
377         struct o2hb_nego_msg msg;
378         int status, ret;
379 
380         msg.node_num = o2nm_this_node();
381 again:
382         ret = o2net_send_message(type, key, &msg, sizeof(msg),
383                         target, &status);
384 
385         if (ret == -EAGAIN || ret == -ENOMEM) {
386                 msleep(100);
387                 goto again;
388         }
389 
390         return ret;
391 }
392 
393 static void o2hb_nego_timeout(struct work_struct *work)
394 {
395         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
396         int master_node, i, ret;
397         struct o2hb_region *reg;
398 
399         reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
400         /* don't negotiate timeout if last hb failed since it is very
401          * possible io failed. Should let write timeout fence self.
402          */
403         if (reg->hr_last_hb_status)
404                 return;
405 
406         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
407         /* lowest node as master node to make negotiate decision. */
408         master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
409 
410         if (master_node == o2nm_this_node()) {
411                 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
412                         printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
413                                 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
414                                 config_item_name(&reg->hr_item), reg->hr_dev_name);
415                         set_bit(master_node, reg->hr_nego_node_bitmap);
416                 }
417                 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
418                                 sizeof(reg->hr_nego_node_bitmap))) {
419                         /* check negotiate bitmap every second to do timeout
420                          * approve decision.
421                          */
422                         schedule_delayed_work(&reg->hr_nego_timeout_work,
423                                 msecs_to_jiffies(1000));
424 
425                         return;
426                 }
427 
428                 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
429                         config_item_name(&reg->hr_item), reg->hr_dev_name);
430                 /* approve negotiate timeout request. */
431                 o2hb_arm_timeout(reg);
432 
433                 i = -1;
434                 while ((i = find_next_bit(live_node_bitmap,
435                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
436                         if (i == master_node)
437                                 continue;
438 
439                         mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
440                         ret = o2hb_send_nego_msg(reg->hr_key,
441                                         O2HB_NEGO_APPROVE_MSG, i);
442                         if (ret)
443                                 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
444                                         i, ret);
445                 }
446         } else {
447                 /* negotiate timeout with master node. */
448                 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
449                         o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
450                         reg->hr_dev_name, master_node);
451                 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
452                                 master_node);
453                 if (ret)
454                         mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
455                                 master_node, ret);
456         }
457 }
458 
459 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
460                                 void **ret_data)
461 {
462         struct o2hb_region *reg = data;
463         struct o2hb_nego_msg *nego_msg;
464 
465         nego_msg = (struct o2hb_nego_msg *)msg->buf;
466         printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
467                 nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
468         if (nego_msg->node_num < O2NM_MAX_NODES)
469                 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
470         else
471                 mlog(ML_ERROR, "got nego timeout message from bad node.\n");
472 
473         return 0;
474 }
475 
476 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
477                                 void **ret_data)
478 {
479         struct o2hb_region *reg = data;
480 
481         printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
482                 config_item_name(&reg->hr_item), reg->hr_dev_name);
483         o2hb_arm_timeout(reg);
484         return 0;
485 }
486 
487 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
488 {
489         atomic_set(&wc->wc_num_reqs, 1);
490         init_completion(&wc->wc_io_complete);
491         wc->wc_error = 0;
492 }
493 
494 /* Used in error paths too */
495 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
496                                      unsigned int num)
497 {
498         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
499          * good news is that the fast path only completes one at a time */
500         while(num--) {
501                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
502                         BUG_ON(num > 0);
503                         complete(&wc->wc_io_complete);
504                 }
505         }
506 }
507 
508 static void o2hb_wait_on_io(struct o2hb_region *reg,
509                             struct o2hb_bio_wait_ctxt *wc)
510 {
511         o2hb_bio_wait_dec(wc, 1);
512         wait_for_completion(&wc->wc_io_complete);
513 }
514 
515 static void o2hb_bio_end_io(struct bio *bio)
516 {
517         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
518 
519         if (bio->bi_error) {
520                 mlog(ML_ERROR, "IO Error %d\n", bio->bi_error);
521                 wc->wc_error = bio->bi_error;
522         }
523 
524         o2hb_bio_wait_dec(wc, 1);
525         bio_put(bio);
526 }
527 
528 /* Setup a Bio to cover I/O against num_slots slots starting at
529  * start_slot. */
530 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
531                                       struct o2hb_bio_wait_ctxt *wc,
532                                       unsigned int *current_slot,
533                                       unsigned int max_slots, int op,
534                                       int op_flags)
535 {
536         int len, current_page;
537         unsigned int vec_len, vec_start;
538         unsigned int bits = reg->hr_block_bits;
539         unsigned int spp = reg->hr_slots_per_page;
540         unsigned int cs = *current_slot;
541         struct bio *bio;
542         struct page *page;
543 
544         /* Testing has shown this allocation to take long enough under
545          * GFP_KERNEL that the local node can get fenced. It would be
546          * nicest if we could pre-allocate these bios and avoid this
547          * all together. */
548         bio = bio_alloc(GFP_ATOMIC, 16);
549         if (!bio) {
550                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
551                 bio = ERR_PTR(-ENOMEM);
552                 goto bail;
553         }
554 
555         /* Must put everything in 512 byte sectors for the bio... */
556         bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
557         bio->bi_bdev = reg->hr_bdev;
558         bio->bi_private = wc;
559         bio->bi_end_io = o2hb_bio_end_io;
560         bio_set_op_attrs(bio, op, op_flags);
561 
562         vec_start = (cs << bits) % PAGE_SIZE;
563         while(cs < max_slots) {
564                 current_page = cs / spp;
565                 page = reg->hr_slot_data[current_page];
566 
567                 vec_len = min(PAGE_SIZE - vec_start,
568                               (max_slots-cs) * (PAGE_SIZE/spp) );
569 
570                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
571                      current_page, vec_len, vec_start);
572 
573                 len = bio_add_page(bio, page, vec_len, vec_start);
574                 if (len != vec_len) break;
575 
576                 cs += vec_len / (PAGE_SIZE/spp);
577                 vec_start = 0;
578         }
579 
580 bail:
581         *current_slot = cs;
582         return bio;
583 }
584 
585 static int o2hb_read_slots(struct o2hb_region *reg,
586                            unsigned int max_slots)
587 {
588         unsigned int current_slot=0;
589         int status;
590         struct o2hb_bio_wait_ctxt wc;
591         struct bio *bio;
592 
593         o2hb_bio_wait_init(&wc);
594 
595         while(current_slot < max_slots) {
596                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
597                                          REQ_OP_READ, 0);
598                 if (IS_ERR(bio)) {
599                         status = PTR_ERR(bio);
600                         mlog_errno(status);
601                         goto bail_and_wait;
602                 }
603 
604                 atomic_inc(&wc.wc_num_reqs);
605                 submit_bio(bio);
606         }
607 
608         status = 0;
609 
610 bail_and_wait:
611         o2hb_wait_on_io(reg, &wc);
612         if (wc.wc_error && !status)
613                 status = wc.wc_error;
614 
615         return status;
616 }
617 
618 static int o2hb_issue_node_write(struct o2hb_region *reg,
619                                  struct o2hb_bio_wait_ctxt *write_wc)
620 {
621         int status;
622         unsigned int slot;
623         struct bio *bio;
624 
625         o2hb_bio_wait_init(write_wc);
626 
627         slot = o2nm_this_node();
628 
629         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE,
630                                  REQ_SYNC);
631         if (IS_ERR(bio)) {
632                 status = PTR_ERR(bio);
633                 mlog_errno(status);
634                 goto bail;
635         }
636 
637         atomic_inc(&write_wc->wc_num_reqs);
638         submit_bio(bio);
639 
640         status = 0;
641 bail:
642         return status;
643 }
644 
645 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
646                                      struct o2hb_disk_heartbeat_block *hb_block)
647 {
648         __le32 old_cksum;
649         u32 ret;
650 
651         /* We want to compute the block crc with a 0 value in the
652          * hb_cksum field. Save it off here and replace after the
653          * crc. */
654         old_cksum = hb_block->hb_cksum;
655         hb_block->hb_cksum = 0;
656 
657         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
658 
659         hb_block->hb_cksum = old_cksum;
660 
661         return ret;
662 }
663 
664 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
665 {
666         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
667              "cksum = 0x%x, generation 0x%llx\n",
668              (long long)le64_to_cpu(hb_block->hb_seq),
669              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
670              (long long)le64_to_cpu(hb_block->hb_generation));
671 }
672 
673 static int o2hb_verify_crc(struct o2hb_region *reg,
674                            struct o2hb_disk_heartbeat_block *hb_block)
675 {
676         u32 read, computed;
677 
678         read = le32_to_cpu(hb_block->hb_cksum);
679         computed = o2hb_compute_block_crc_le(reg, hb_block);
680 
681         return read == computed;
682 }
683 
684 /*
685  * Compare the slot data with what we wrote in the last iteration.
686  * If the match fails, print an appropriate error message. This is to
687  * detect errors like... another node hearting on the same slot,
688  * flaky device that is losing writes, etc.
689  * Returns 1 if check succeeds, 0 otherwise.
690  */
691 static int o2hb_check_own_slot(struct o2hb_region *reg)
692 {
693         struct o2hb_disk_slot *slot;
694         struct o2hb_disk_heartbeat_block *hb_block;
695         char *errstr;
696 
697         slot = &reg->hr_slots[o2nm_this_node()];
698         /* Don't check on our 1st timestamp */
699         if (!slot->ds_last_time)
700                 return 0;
701 
702         hb_block = slot->ds_raw_block;
703         if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
704             le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
705             hb_block->hb_node == slot->ds_node_num)
706                 return 1;
707 
708 #define ERRSTR1         "Another node is heartbeating on device"
709 #define ERRSTR2         "Heartbeat generation mismatch on device"
710 #define ERRSTR3         "Heartbeat sequence mismatch on device"
711 
712         if (hb_block->hb_node != slot->ds_node_num)
713                 errstr = ERRSTR1;
714         else if (le64_to_cpu(hb_block->hb_generation) !=
715                  slot->ds_last_generation)
716                 errstr = ERRSTR2;
717         else
718                 errstr = ERRSTR3;
719 
720         mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
721              "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
722              slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
723              (unsigned long long)slot->ds_last_time, hb_block->hb_node,
724              (unsigned long long)le64_to_cpu(hb_block->hb_generation),
725              (unsigned long long)le64_to_cpu(hb_block->hb_seq));
726 
727         return 0;
728 }
729 
730 static inline void o2hb_prepare_block(struct o2hb_region *reg,
731                                       u64 generation)
732 {
733         int node_num;
734         u64 cputime;
735         struct o2hb_disk_slot *slot;
736         struct o2hb_disk_heartbeat_block *hb_block;
737 
738         node_num = o2nm_this_node();
739         slot = &reg->hr_slots[node_num];
740 
741         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
742         memset(hb_block, 0, reg->hr_block_bytes);
743         /* TODO: time stuff */
744         cputime = ktime_get_real_seconds();
745         if (!cputime)
746                 cputime = 1;
747 
748         hb_block->hb_seq = cpu_to_le64(cputime);
749         hb_block->hb_node = node_num;
750         hb_block->hb_generation = cpu_to_le64(generation);
751         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
752 
753         /* This step must always happen last! */
754         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
755                                                                    hb_block));
756 
757         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
758              (long long)generation,
759              le32_to_cpu(hb_block->hb_cksum));
760 }
761 
762 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
763                                 struct o2nm_node *node,
764                                 int idx)
765 {
766         struct o2hb_callback_func *f;
767 
768         list_for_each_entry(f, &hbcall->list, hc_item) {
769                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
770                 (f->hc_func)(node, idx, f->hc_data);
771         }
772 }
773 
774 /* Will run the list in order until we process the passed event */
775 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
776 {
777         struct o2hb_callback *hbcall;
778         struct o2hb_node_event *event;
779 
780         /* Holding callback sem assures we don't alter the callback
781          * lists when doing this, and serializes ourselves with other
782          * processes wanting callbacks. */
783         down_write(&o2hb_callback_sem);
784 
785         spin_lock(&o2hb_live_lock);
786         while (!list_empty(&o2hb_node_events)
787                && !list_empty(&queued_event->hn_item)) {
788                 event = list_entry(o2hb_node_events.next,
789                                    struct o2hb_node_event,
790                                    hn_item);
791                 list_del_init(&event->hn_item);
792                 spin_unlock(&o2hb_live_lock);
793 
794                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
795                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
796                      event->hn_node_num);
797 
798                 hbcall = hbcall_from_type(event->hn_event_type);
799 
800                 /* We should *never* have gotten on to the list with a
801                  * bad type... This isn't something that we should try
802                  * to recover from. */
803                 BUG_ON(IS_ERR(hbcall));
804 
805                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
806 
807                 spin_lock(&o2hb_live_lock);
808         }
809         spin_unlock(&o2hb_live_lock);
810 
811         up_write(&o2hb_callback_sem);
812 }
813 
814 static void o2hb_queue_node_event(struct o2hb_node_event *event,
815                                   enum o2hb_callback_type type,
816                                   struct o2nm_node *node,
817                                   int node_num)
818 {
819         assert_spin_locked(&o2hb_live_lock);
820 
821         BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
822 
823         event->hn_event_type = type;
824         event->hn_node = node;
825         event->hn_node_num = node_num;
826 
827         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
828              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
829 
830         list_add_tail(&event->hn_item, &o2hb_node_events);
831 }
832 
833 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
834 {
835         struct o2hb_node_event event =
836                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
837         struct o2nm_node *node;
838         int queued = 0;
839 
840         node = o2nm_get_node_by_num(slot->ds_node_num);
841         if (!node)
842                 return;
843 
844         spin_lock(&o2hb_live_lock);
845         if (!list_empty(&slot->ds_live_item)) {
846                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
847                      slot->ds_node_num);
848 
849                 list_del_init(&slot->ds_live_item);
850 
851                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
852                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
853 
854                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
855                                               slot->ds_node_num);
856                         queued = 1;
857                 }
858         }
859         spin_unlock(&o2hb_live_lock);
860 
861         if (queued)
862                 o2hb_run_event_list(&event);
863 
864         o2nm_node_put(node);
865 }
866 
867 static void o2hb_set_quorum_device(struct o2hb_region *reg)
868 {
869         if (!o2hb_global_heartbeat_active())
870                 return;
871 
872         /* Prevent race with o2hb_heartbeat_group_drop_item() */
873         if (kthread_should_stop())
874                 return;
875 
876         /* Tag region as quorum only after thread reaches steady state */
877         if (atomic_read(&reg->hr_steady_iterations) != 0)
878                 return;
879 
880         spin_lock(&o2hb_live_lock);
881 
882         if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
883                 goto unlock;
884 
885         /*
886          * A region can be added to the quorum only when it sees all
887          * live nodes heartbeat on it. In other words, the region has been
888          * added to all nodes.
889          */
890         if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
891                    sizeof(o2hb_live_node_bitmap)))
892                 goto unlock;
893 
894         printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
895                config_item_name(&reg->hr_item), reg->hr_dev_name);
896 
897         set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
898 
899         /*
900          * If global heartbeat active, unpin all regions if the
901          * region count > CUT_OFF
902          */
903         if (bitmap_weight(o2hb_quorum_region_bitmap,
904                            O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
905                 o2hb_region_unpin(NULL);
906 unlock:
907         spin_unlock(&o2hb_live_lock);
908 }
909 
910 static int o2hb_check_slot(struct o2hb_region *reg,
911                            struct o2hb_disk_slot *slot)
912 {
913         int changed = 0, gen_changed = 0;
914         struct o2hb_node_event event =
915                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
916         struct o2nm_node *node;
917         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
918         u64 cputime;
919         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
920         unsigned int slot_dead_ms;
921         int tmp;
922         int queued = 0;
923 
924         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
925 
926         /*
927          * If a node is no longer configured but is still in the livemap, we
928          * may need to clear that bit from the livemap.
929          */
930         node = o2nm_get_node_by_num(slot->ds_node_num);
931         if (!node) {
932                 spin_lock(&o2hb_live_lock);
933                 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
934                 spin_unlock(&o2hb_live_lock);
935                 if (!tmp)
936                         return 0;
937         }
938 
939         if (!o2hb_verify_crc(reg, hb_block)) {
940                 /* all paths from here will drop o2hb_live_lock for
941                  * us. */
942                 spin_lock(&o2hb_live_lock);
943 
944                 /* Don't print an error on the console in this case -
945                  * a freshly formatted heartbeat area will not have a
946                  * crc set on it. */
947                 if (list_empty(&slot->ds_live_item))
948                         goto out;
949 
950                 /* The node is live but pushed out a bad crc. We
951                  * consider it a transient miss but don't populate any
952                  * other values as they may be junk. */
953                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
954                      slot->ds_node_num, reg->hr_dev_name);
955                 o2hb_dump_slot(hb_block);
956 
957                 slot->ds_equal_samples++;
958                 goto fire_callbacks;
959         }
960 
961         /* we don't care if these wrap.. the state transitions below
962          * clear at the right places */
963         cputime = le64_to_cpu(hb_block->hb_seq);
964         if (slot->ds_last_time != cputime)
965                 slot->ds_changed_samples++;
966         else
967                 slot->ds_equal_samples++;
968         slot->ds_last_time = cputime;
969 
970         /* The node changed heartbeat generations. We assume this to
971          * mean it dropped off but came back before we timed out. We
972          * want to consider it down for the time being but don't want
973          * to lose any changed_samples state we might build up to
974          * considering it live again. */
975         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
976                 gen_changed = 1;
977                 slot->ds_equal_samples = 0;
978                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
979                      "to 0x%llx)\n", slot->ds_node_num,
980                      (long long)slot->ds_last_generation,
981                      (long long)le64_to_cpu(hb_block->hb_generation));
982         }
983 
984         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
985 
986         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
987              "seq %llu last %llu changed %u equal %u\n",
988              slot->ds_node_num, (long long)slot->ds_last_generation,
989              le32_to_cpu(hb_block->hb_cksum),
990              (unsigned long long)le64_to_cpu(hb_block->hb_seq),
991              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
992              slot->ds_equal_samples);
993 
994         spin_lock(&o2hb_live_lock);
995 
996 fire_callbacks:
997         /* dead nodes only come to life after some number of
998          * changes at any time during their dead time */
999         if (list_empty(&slot->ds_live_item) &&
1000             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
1001                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
1002                      slot->ds_node_num, (long long)slot->ds_last_generation);
1003 
1004                 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1005 
1006                 /* first on the list generates a callback */
1007                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1008                         mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
1009                              "bitmap\n", slot->ds_node_num);
1010                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1011 
1012                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
1013                                               slot->ds_node_num);
1014 
1015                         changed = 1;
1016                         queued = 1;
1017                 }
1018 
1019                 list_add_tail(&slot->ds_live_item,
1020                               &o2hb_live_slots[slot->ds_node_num]);
1021 
1022                 slot->ds_equal_samples = 0;
1023 
1024                 /* We want to be sure that all nodes agree on the
1025                  * number of milliseconds before a node will be
1026                  * considered dead. The self-fencing timeout is
1027                  * computed from this value, and a discrepancy might
1028                  * result in heartbeat calling a node dead when it
1029                  * hasn't self-fenced yet. */
1030                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1031                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
1032                         /* TODO: Perhaps we can fail the region here. */
1033                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
1034                              "of %u ms, but our count is %u ms.\n"
1035                              "Please double check your configuration values "
1036                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1037                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1038                              dead_ms);
1039                 }
1040                 goto out;
1041         }
1042 
1043         /* if the list is dead, we're done.. */
1044         if (list_empty(&slot->ds_live_item))
1045                 goto out;
1046 
1047         /* live nodes only go dead after enough consequtive missed
1048          * samples..  reset the missed counter whenever we see
1049          * activity */
1050         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1051                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
1052                      slot->ds_node_num);
1053 
1054                 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1055 
1056                 /* last off the live_slot generates a callback */
1057                 list_del_init(&slot->ds_live_item);
1058                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1059                         mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1060                              "nodes bitmap\n", slot->ds_node_num);
1061                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1062 
1063                         /* node can be null */
1064                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1065                                               node, slot->ds_node_num);
1066 
1067                         changed = 1;
1068                         queued = 1;
1069                 }
1070 
1071                 /* We don't clear this because the node is still
1072                  * actually writing new blocks. */
1073                 if (!gen_changed)
1074                         slot->ds_changed_samples = 0;
1075                 goto out;
1076         }
1077         if (slot->ds_changed_samples) {
1078                 slot->ds_changed_samples = 0;
1079                 slot->ds_equal_samples = 0;
1080         }
1081 out:
1082         spin_unlock(&o2hb_live_lock);
1083 
1084         if (queued)
1085                 o2hb_run_event_list(&event);
1086 
1087         if (node)
1088                 o2nm_node_put(node);
1089         return changed;
1090 }
1091 
1092 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1093 {
1094         return find_last_bit(nodes, numbits);
1095 }
1096 
1097 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1098 {
1099         int i, ret, highest_node;
1100         int membership_change = 0, own_slot_ok = 0;
1101         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1102         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1103         struct o2hb_bio_wait_ctxt write_wc;
1104 
1105         ret = o2nm_configured_node_map(configured_nodes,
1106                                        sizeof(configured_nodes));
1107         if (ret) {
1108                 mlog_errno(ret);
1109                 goto bail;
1110         }
1111 
1112         /*
1113          * If a node is not configured but is in the livemap, we still need
1114          * to read the slot so as to be able to remove it from the livemap.
1115          */
1116         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1117         i = -1;
1118         while ((i = find_next_bit(live_node_bitmap,
1119                                   O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1120                 set_bit(i, configured_nodes);
1121         }
1122 
1123         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1124         if (highest_node >= O2NM_MAX_NODES) {
1125                 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1126                 ret = -EINVAL;
1127                 goto bail;
1128         }
1129 
1130         /* No sense in reading the slots of nodes that don't exist
1131          * yet. Of course, if the node definitions have holes in them
1132          * then we're reading an empty slot anyway... Consider this
1133          * best-effort. */
1134         ret = o2hb_read_slots(reg, highest_node + 1);
1135         if (ret < 0) {
1136                 mlog_errno(ret);
1137                 goto bail;
1138         }
1139 
1140         /* With an up to date view of the slots, we can check that no
1141          * other node has been improperly configured to heartbeat in
1142          * our slot. */
1143         own_slot_ok = o2hb_check_own_slot(reg);
1144 
1145         /* fill in the proper info for our next heartbeat */
1146         o2hb_prepare_block(reg, reg->hr_generation);
1147 
1148         ret = o2hb_issue_node_write(reg, &write_wc);
1149         if (ret < 0) {
1150                 mlog_errno(ret);
1151                 goto bail;
1152         }
1153 
1154         i = -1;
1155         while((i = find_next_bit(configured_nodes,
1156                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1157                 membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1158         }
1159 
1160         /*
1161          * We have to be sure we've advertised ourselves on disk
1162          * before we can go to steady state.  This ensures that
1163          * people we find in our steady state have seen us.
1164          */
1165         o2hb_wait_on_io(reg, &write_wc);
1166         if (write_wc.wc_error) {
1167                 /* Do not re-arm the write timeout on I/O error - we
1168                  * can't be sure that the new block ever made it to
1169                  * disk */
1170                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1171                      write_wc.wc_error, reg->hr_dev_name);
1172                 ret = write_wc.wc_error;
1173                 goto bail;
1174         }
1175 
1176         /* Skip disarming the timeout if own slot has stale/bad data */
1177         if (own_slot_ok) {
1178                 o2hb_set_quorum_device(reg);
1179                 o2hb_arm_timeout(reg);
1180                 reg->hr_last_timeout_start = jiffies;
1181         }
1182 
1183 bail:
1184         /* let the person who launched us know when things are steady */
1185         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1186                 if (!ret && own_slot_ok && !membership_change) {
1187                         if (atomic_dec_and_test(&reg->hr_steady_iterations))
1188                                 wake_up(&o2hb_steady_queue);
1189                 }
1190         }
1191 
1192         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1193                 if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1194                         printk(KERN_NOTICE "o2hb: Unable to stabilize "
1195                                "heartbeart on region %s (%s)\n",
1196                                config_item_name(&reg->hr_item),
1197                                reg->hr_dev_name);
1198                         atomic_set(&reg->hr_steady_iterations, 0);
1199                         reg->hr_aborted_start = 1;
1200                         wake_up(&o2hb_steady_queue);
1201                         ret = -EIO;
1202                 }
1203         }
1204 
1205         return ret;
1206 }
1207 
1208 /*
1209  * we ride the region ref that the region dir holds.  before the region
1210  * dir is removed and drops it ref it will wait to tear down this
1211  * thread.
1212  */
1213 static int o2hb_thread(void *data)
1214 {
1215         int i, ret;
1216         struct o2hb_region *reg = data;
1217         struct o2hb_bio_wait_ctxt write_wc;
1218         ktime_t before_hb, after_hb;
1219         unsigned int elapsed_msec;
1220 
1221         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1222 
1223         set_user_nice(current, MIN_NICE);
1224 
1225         /* Pin node */
1226         ret = o2nm_depend_this_node();
1227         if (ret) {
1228                 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1229                 reg->hr_node_deleted = 1;
1230                 wake_up(&o2hb_steady_queue);
1231                 return 0;
1232         }
1233 
1234         while (!kthread_should_stop() &&
1235                !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1236                 /* We track the time spent inside
1237                  * o2hb_do_disk_heartbeat so that we avoid more than
1238                  * hr_timeout_ms between disk writes. On busy systems
1239                  * this should result in a heartbeat which is less
1240                  * likely to time itself out. */
1241                 before_hb = ktime_get_real();
1242 
1243                 ret = o2hb_do_disk_heartbeat(reg);
1244                 reg->hr_last_hb_status = ret;
1245 
1246                 after_hb = ktime_get_real();
1247 
1248                 elapsed_msec = (unsigned int)
1249                                 ktime_ms_delta(after_hb, before_hb);
1250 
1251                 mlog(ML_HEARTBEAT,
1252                      "start = %lld, end = %lld, msec = %u, ret = %d\n",
1253                      before_hb, after_hb, elapsed_msec, ret);
1254 
1255                 if (!kthread_should_stop() &&
1256                     elapsed_msec < reg->hr_timeout_ms) {
1257                         /* the kthread api has blocked signals for us so no
1258                          * need to record the return value. */
1259                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1260                 }
1261         }
1262 
1263         o2hb_disarm_timeout(reg);
1264 
1265         /* unclean stop is only used in very bad situation */
1266         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1267                 o2hb_shutdown_slot(&reg->hr_slots[i]);
1268 
1269         /* Explicit down notification - avoid forcing the other nodes
1270          * to timeout on this region when we could just as easily
1271          * write a clear generation - thus indicating to them that
1272          * this node has left this region.
1273          */
1274         if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1275                 o2hb_prepare_block(reg, 0);
1276                 ret = o2hb_issue_node_write(reg, &write_wc);
1277                 if (ret == 0)
1278                         o2hb_wait_on_io(reg, &write_wc);
1279                 else
1280                         mlog_errno(ret);
1281         }
1282 
1283         /* Unpin node */
1284         o2nm_undepend_this_node();
1285 
1286         mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1287 
1288         return 0;
1289 }
1290 
1291 #ifdef CONFIG_DEBUG_FS
1292 static int o2hb_debug_open(struct inode *inode, struct file *file)
1293 {
1294         struct o2hb_debug_buf *db = inode->i_private;
1295         struct o2hb_region *reg;
1296         unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1297         unsigned long lts;
1298         char *buf = NULL;
1299         int i = -1;
1300         int out = 0;
1301 
1302         /* max_nodes should be the largest bitmap we pass here */
1303         BUG_ON(sizeof(map) < db->db_size);
1304 
1305         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1306         if (!buf)
1307                 goto bail;
1308 
1309         switch (db->db_type) {
1310         case O2HB_DB_TYPE_LIVENODES:
1311         case O2HB_DB_TYPE_LIVEREGIONS:
1312         case O2HB_DB_TYPE_QUORUMREGIONS:
1313         case O2HB_DB_TYPE_FAILEDREGIONS:
1314                 spin_lock(&o2hb_live_lock);
1315                 memcpy(map, db->db_data, db->db_size);
1316                 spin_unlock(&o2hb_live_lock);
1317                 break;
1318 
1319         case O2HB_DB_TYPE_REGION_LIVENODES:
1320                 spin_lock(&o2hb_live_lock);
1321                 reg = (struct o2hb_region *)db->db_data;
1322                 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1323                 spin_unlock(&o2hb_live_lock);
1324                 break;
1325 
1326         case O2HB_DB_TYPE_REGION_NUMBER:
1327                 reg = (struct o2hb_region *)db->db_data;
1328                 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1329                                 reg->hr_region_num);
1330                 goto done;
1331 
1332         case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1333                 reg = (struct o2hb_region *)db->db_data;
1334                 lts = reg->hr_last_timeout_start;
1335                 /* If 0, it has never been set before */
1336                 if (lts)
1337                         lts = jiffies_to_msecs(jiffies - lts);
1338                 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1339                 goto done;
1340 
1341         case O2HB_DB_TYPE_REGION_PINNED:
1342                 reg = (struct o2hb_region *)db->db_data;
1343                 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1344                                 !!reg->hr_item_pinned);
1345                 goto done;
1346 
1347         default:
1348                 goto done;
1349         }
1350 
1351         while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1352                 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1353         out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1354 
1355 done:
1356         i_size_write(inode, out);
1357 
1358         file->private_data = buf;
1359 
1360         return 0;
1361 bail:
1362         return -ENOMEM;
1363 }
1364 
1365 static int o2hb_debug_release(struct inode *inode, struct file *file)
1366 {
1367         kfree(file->private_data);
1368         return 0;
1369 }
1370 
1371 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1372                                  size_t nbytes, loff_t *ppos)
1373 {
1374         return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1375                                        i_size_read(file->f_mapping->host));
1376 }
1377 #else
1378 static int o2hb_debug_open(struct inode *inode, struct file *file)
1379 {
1380         return 0;
1381 }
1382 static int o2hb_debug_release(struct inode *inode, struct file *file)
1383 {
1384         return 0;
1385 }
1386 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1387                                size_t nbytes, loff_t *ppos)
1388 {
1389         return 0;
1390 }
1391 #endif  /* CONFIG_DEBUG_FS */
1392 
1393 static const struct file_operations o2hb_debug_fops = {
1394         .open =         o2hb_debug_open,
1395         .release =      o2hb_debug_release,
1396         .read =         o2hb_debug_read,
1397         .llseek =       generic_file_llseek,
1398 };
1399 
1400 void o2hb_exit(void)
1401 {
1402         debugfs_remove(o2hb_debug_failedregions);
1403         debugfs_remove(o2hb_debug_quorumregions);
1404         debugfs_remove(o2hb_debug_liveregions);
1405         debugfs_remove(o2hb_debug_livenodes);
1406         debugfs_remove(o2hb_debug_dir);
1407         kfree(o2hb_db_livenodes);
1408         kfree(o2hb_db_liveregions);
1409         kfree(o2hb_db_quorumregions);
1410         kfree(o2hb_db_failedregions);
1411 }
1412 
1413 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1414                                         struct o2hb_debug_buf **db, int db_len,
1415                                         int type, int size, int len, void *data)
1416 {
1417         *db = kmalloc(db_len, GFP_KERNEL);
1418         if (!*db)
1419                 return NULL;
1420 
1421         (*db)->db_type = type;
1422         (*db)->db_size = size;
1423         (*db)->db_len = len;
1424         (*db)->db_data = data;
1425 
1426         return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1427                                    &o2hb_debug_fops);
1428 }
1429 
1430 static int o2hb_debug_init(void)
1431 {
1432         int ret = -ENOMEM;
1433 
1434         o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1435         if (!o2hb_debug_dir) {
1436                 mlog_errno(ret);
1437                 goto bail;
1438         }
1439 
1440         o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1441                                                  o2hb_debug_dir,
1442                                                  &o2hb_db_livenodes,
1443                                                  sizeof(*o2hb_db_livenodes),
1444                                                  O2HB_DB_TYPE_LIVENODES,
1445                                                  sizeof(o2hb_live_node_bitmap),
1446                                                  O2NM_MAX_NODES,
1447                                                  o2hb_live_node_bitmap);
1448         if (!o2hb_debug_livenodes) {
1449                 mlog_errno(ret);
1450                 goto bail;
1451         }
1452 
1453         o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1454                                                    o2hb_debug_dir,
1455                                                    &o2hb_db_liveregions,
1456                                                    sizeof(*o2hb_db_liveregions),
1457                                                    O2HB_DB_TYPE_LIVEREGIONS,
1458                                                    sizeof(o2hb_live_region_bitmap),
1459                                                    O2NM_MAX_REGIONS,
1460                                                    o2hb_live_region_bitmap);
1461         if (!o2hb_debug_liveregions) {
1462                 mlog_errno(ret);
1463                 goto bail;
1464         }
1465 
1466         o2hb_debug_quorumregions =
1467                         o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1468                                           o2hb_debug_dir,
1469                                           &o2hb_db_quorumregions,
1470                                           sizeof(*o2hb_db_quorumregions),
1471                                           O2HB_DB_TYPE_QUORUMREGIONS,
1472                                           sizeof(o2hb_quorum_region_bitmap),
1473                                           O2NM_MAX_REGIONS,
1474                                           o2hb_quorum_region_bitmap);
1475         if (!o2hb_debug_quorumregions) {
1476                 mlog_errno(ret);
1477                 goto bail;
1478         }
1479 
1480         o2hb_debug_failedregions =
1481                         o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1482                                           o2hb_debug_dir,
1483                                           &o2hb_db_failedregions,
1484                                           sizeof(*o2hb_db_failedregions),
1485                                           O2HB_DB_TYPE_FAILEDREGIONS,
1486                                           sizeof(o2hb_failed_region_bitmap),
1487                                           O2NM_MAX_REGIONS,
1488                                           o2hb_failed_region_bitmap);
1489         if (!o2hb_debug_failedregions) {
1490                 mlog_errno(ret);
1491                 goto bail;
1492         }
1493 
1494         ret = 0;
1495 bail:
1496         if (ret)
1497                 o2hb_exit();
1498 
1499         return ret;
1500 }
1501 
1502 int o2hb_init(void)
1503 {
1504         int i;
1505 
1506         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1507                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1508 
1509         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1510                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1511 
1512         INIT_LIST_HEAD(&o2hb_node_events);
1513 
1514         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1515         memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1516         memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1517         memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1518         memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1519 
1520         o2hb_dependent_users = 0;
1521 
1522         return o2hb_debug_init();
1523 }
1524 
1525 /* if we're already in a callback then we're already serialized by the sem */
1526 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1527                                              unsigned bytes)
1528 {
1529         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1530 
1531         memcpy(map, &o2hb_live_node_bitmap, bytes);
1532 }
1533 
1534 /*
1535  * get a map of all nodes that are heartbeating in any regions
1536  */
1537 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1538 {
1539         /* callers want to serialize this map and callbacks so that they
1540          * can trust that they don't miss nodes coming to the party */
1541         down_read(&o2hb_callback_sem);
1542         spin_lock(&o2hb_live_lock);
1543         o2hb_fill_node_map_from_callback(map, bytes);
1544         spin_unlock(&o2hb_live_lock);
1545         up_read(&o2hb_callback_sem);
1546 }
1547 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1548 
1549 /*
1550  * heartbeat configfs bits.  The heartbeat set is a default set under
1551  * the cluster set in nodemanager.c.
1552  */
1553 
1554 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1555 {
1556         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1557 }
1558 
1559 /* drop_item only drops its ref after killing the thread, nothing should
1560  * be using the region anymore.  this has to clean up any state that
1561  * attributes might have built up. */
1562 static void o2hb_region_release(struct config_item *item)
1563 {
1564         int i;
1565         struct page *page;
1566         struct o2hb_region *reg = to_o2hb_region(item);
1567 
1568         mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1569 
1570         kfree(reg->hr_tmp_block);
1571 
1572         if (reg->hr_slot_data) {
1573                 for (i = 0; i < reg->hr_num_pages; i++) {
1574                         page = reg->hr_slot_data[i];
1575                         if (page)
1576                                 __free_page(page);
1577                 }
1578                 kfree(reg->hr_slot_data);
1579         }
1580 
1581         if (reg->hr_bdev)
1582                 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1583 
1584         kfree(reg->hr_slots);
1585 
1586         debugfs_remove(reg->hr_debug_livenodes);
1587         debugfs_remove(reg->hr_debug_regnum);
1588         debugfs_remove(reg->hr_debug_elapsed_time);
1589         debugfs_remove(reg->hr_debug_pinned);
1590         debugfs_remove(reg->hr_debug_dir);
1591         kfree(reg->hr_db_livenodes);
1592         kfree(reg->hr_db_regnum);
1593         kfree(reg->hr_db_elapsed_time);
1594         kfree(reg->hr_db_pinned);
1595 
1596         spin_lock(&o2hb_live_lock);
1597         list_del(&reg->hr_all_item);
1598         spin_unlock(&o2hb_live_lock);
1599 
1600         o2net_unregister_handler_list(&reg->hr_handler_list);
1601         kfree(reg);
1602 }
1603 
1604 static int o2hb_read_block_input(struct o2hb_region *reg,
1605                                  const char *page,
1606                                  unsigned long *ret_bytes,
1607                                  unsigned int *ret_bits)
1608 {
1609         unsigned long bytes;
1610         char *p = (char *)page;
1611 
1612         bytes = simple_strtoul(p, &p, 0);
1613         if (!p || (*p && (*p != '\n')))
1614                 return -EINVAL;
1615 
1616         /* Heartbeat and fs min / max block sizes are the same. */
1617         if (bytes > 4096 || bytes < 512)
1618                 return -ERANGE;
1619         if (hweight16(bytes) != 1)
1620                 return -EINVAL;
1621 
1622         if (ret_bytes)
1623                 *ret_bytes = bytes;
1624         if (ret_bits)
1625                 *ret_bits = ffs(bytes) - 1;
1626 
1627         return 0;
1628 }
1629 
1630 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1631                                             char *page)
1632 {
1633         return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1634 }
1635 
1636 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1637                                              const char *page,
1638                                              size_t count)
1639 {
1640         struct o2hb_region *reg = to_o2hb_region(item);
1641         int status;
1642         unsigned long block_bytes;
1643         unsigned int block_bits;
1644 
1645         if (reg->hr_bdev)
1646                 return -EINVAL;
1647 
1648         status = o2hb_read_block_input(reg, page, &block_bytes,
1649                                        &block_bits);
1650         if (status)
1651                 return status;
1652 
1653         reg->hr_block_bytes = (unsigned int)block_bytes;
1654         reg->hr_block_bits = block_bits;
1655 
1656         return count;
1657 }
1658 
1659 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1660                                             char *page)
1661 {
1662         return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1663 }
1664 
1665 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1666                                              const char *page,
1667                                              size_t count)
1668 {
1669         struct o2hb_region *reg = to_o2hb_region(item);
1670         unsigned long long tmp;
1671         char *p = (char *)page;
1672 
1673         if (reg->hr_bdev)
1674                 return -EINVAL;
1675 
1676         tmp = simple_strtoull(p, &p, 0);
1677         if (!p || (*p && (*p != '\n')))
1678                 return -EINVAL;
1679 
1680         reg->hr_start_block = tmp;
1681 
1682         return count;
1683 }
1684 
1685 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1686 {
1687         return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1688 }
1689 
1690 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1691                                         const char *page,
1692                                         size_t count)
1693 {
1694         struct o2hb_region *reg = to_o2hb_region(item);
1695         unsigned long tmp;
1696         char *p = (char *)page;
1697 
1698         if (reg->hr_bdev)
1699                 return -EINVAL;
1700 
1701         tmp = simple_strtoul(p, &p, 0);
1702         if (!p || (*p && (*p != '\n')))
1703                 return -EINVAL;
1704 
1705         if (tmp > O2NM_MAX_NODES || tmp == 0)
1706                 return -ERANGE;
1707 
1708         reg->hr_blocks = (unsigned int)tmp;
1709 
1710         return count;
1711 }
1712 
1713 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1714 {
1715         unsigned int ret = 0;
1716 
1717         if (to_o2hb_region(item)->hr_bdev)
1718                 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1719 
1720         return ret;
1721 }
1722 
1723 static void o2hb_init_region_params(struct o2hb_region *reg)
1724 {
1725         reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1726         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1727 
1728         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1729              reg->hr_start_block, reg->hr_blocks);
1730         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1731              reg->hr_block_bytes, reg->hr_block_bits);
1732         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1733         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1734 }
1735 
1736 static int o2hb_map_slot_data(struct o2hb_region *reg)
1737 {
1738         int i, j;
1739         unsigned int last_slot;
1740         unsigned int spp = reg->hr_slots_per_page;
1741         struct page *page;
1742         char *raw;
1743         struct o2hb_disk_slot *slot;
1744 
1745         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1746         if (reg->hr_tmp_block == NULL)
1747                 return -ENOMEM;
1748 
1749         reg->hr_slots = kcalloc(reg->hr_blocks,
1750                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1751         if (reg->hr_slots == NULL)
1752                 return -ENOMEM;
1753 
1754         for(i = 0; i < reg->hr_blocks; i++) {
1755                 slot = &reg->hr_slots[i];
1756                 slot->ds_node_num = i;
1757                 INIT_LIST_HEAD(&slot->ds_live_item);
1758                 slot->ds_raw_block = NULL;
1759         }
1760 
1761         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1762         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1763                            "at %u blocks per page\n",
1764              reg->hr_num_pages, reg->hr_blocks, spp);
1765 
1766         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1767                                     GFP_KERNEL);
1768         if (!reg->hr_slot_data)
1769                 return -ENOMEM;
1770 
1771         for(i = 0; i < reg->hr_num_pages; i++) {
1772                 page = alloc_page(GFP_KERNEL);
1773                 if (!page)
1774                         return -ENOMEM;
1775 
1776                 reg->hr_slot_data[i] = page;
1777 
1778                 last_slot = i * spp;
1779                 raw = page_address(page);
1780                 for (j = 0;
1781                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1782                      j++) {
1783                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1784 
1785                         slot = &reg->hr_slots[j + last_slot];
1786                         slot->ds_raw_block =
1787                                 (struct o2hb_disk_heartbeat_block *) raw;
1788 
1789                         raw += reg->hr_block_bytes;
1790                 }
1791         }
1792 
1793         return 0;
1794 }
1795 
1796 /* Read in all the slots available and populate the tracking
1797  * structures so that we can start with a baseline idea of what's
1798  * there. */
1799 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1800 {
1801         int ret, i;
1802         struct o2hb_disk_slot *slot;
1803         struct o2hb_disk_heartbeat_block *hb_block;
1804 
1805         ret = o2hb_read_slots(reg, reg->hr_blocks);
1806         if (ret)
1807                 goto out;
1808 
1809         /* We only want to get an idea of the values initially in each
1810          * slot, so we do no verification - o2hb_check_slot will
1811          * actually determine if each configured slot is valid and
1812          * whether any values have changed. */
1813         for(i = 0; i < reg->hr_blocks; i++) {
1814                 slot = &reg->hr_slots[i];
1815                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1816 
1817                 /* Only fill the values that o2hb_check_slot uses to
1818                  * determine changing slots */
1819                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1820                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1821         }
1822 
1823 out:
1824         return ret;
1825 }
1826 
1827 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1828 static ssize_t o2hb_region_dev_store(struct config_item *item,
1829                                      const char *page,
1830                                      size_t count)
1831 {
1832         struct o2hb_region *reg = to_o2hb_region(item);
1833         struct task_struct *hb_task;
1834         long fd;
1835         int sectsize;
1836         char *p = (char *)page;
1837         struct fd f;
1838         struct inode *inode;
1839         ssize_t ret = -EINVAL;
1840         int live_threshold;
1841 
1842         if (reg->hr_bdev)
1843                 goto out;
1844 
1845         /* We can't heartbeat without having had our node number
1846          * configured yet. */
1847         if (o2nm_this_node() == O2NM_MAX_NODES)
1848                 goto out;
1849 
1850         fd = simple_strtol(p, &p, 0);
1851         if (!p || (*p && (*p != '\n')))
1852                 goto out;
1853 
1854         if (fd < 0 || fd >= INT_MAX)
1855                 goto out;
1856 
1857         f = fdget(fd);
1858         if (f.file == NULL)
1859                 goto out;
1860 
1861         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1862             reg->hr_block_bytes == 0)
1863                 goto out2;
1864 
1865         inode = igrab(f.file->f_mapping->host);
1866         if (inode == NULL)
1867                 goto out2;
1868 
1869         if (!S_ISBLK(inode->i_mode))
1870                 goto out3;
1871 
1872         reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1873         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1874         if (ret) {
1875                 reg->hr_bdev = NULL;
1876                 goto out3;
1877         }
1878         inode = NULL;
1879 
1880         bdevname(reg->hr_bdev, reg->hr_dev_name);
1881 
1882         sectsize = bdev_logical_block_size(reg->hr_bdev);
1883         if (sectsize != reg->hr_block_bytes) {
1884                 mlog(ML_ERROR,
1885                      "blocksize %u incorrect for device, expected %d",
1886                      reg->hr_block_bytes, sectsize);
1887                 ret = -EINVAL;
1888                 goto out3;
1889         }
1890 
1891         o2hb_init_region_params(reg);
1892 
1893         /* Generation of zero is invalid */
1894         do {
1895                 get_random_bytes(&reg->hr_generation,
1896                                  sizeof(reg->hr_generation));
1897         } while (reg->hr_generation == 0);
1898 
1899         ret = o2hb_map_slot_data(reg);
1900         if (ret) {
1901                 mlog_errno(ret);
1902                 goto out3;
1903         }
1904 
1905         ret = o2hb_populate_slot_data(reg);
1906         if (ret) {
1907                 mlog_errno(ret);
1908                 goto out3;
1909         }
1910 
1911         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1912         INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1913 
1914         /*
1915          * A node is considered live after it has beat LIVE_THRESHOLD
1916          * times.  We're not steady until we've given them a chance
1917          * _after_ our first read.
1918          * The default threshold is bare minimum so as to limit the delay
1919          * during mounts. For global heartbeat, the threshold doubled for the
1920          * first region.
1921          */
1922         live_threshold = O2HB_LIVE_THRESHOLD;
1923         if (o2hb_global_heartbeat_active()) {
1924                 spin_lock(&o2hb_live_lock);
1925                 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1926                         live_threshold <<= 1;
1927                 spin_unlock(&o2hb_live_lock);
1928         }
1929         ++live_threshold;
1930         atomic_set(&reg->hr_steady_iterations, live_threshold);
1931         /* unsteady_iterations is triple the steady_iterations */
1932         atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1933 
1934         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1935                               reg->hr_item.ci_name);
1936         if (IS_ERR(hb_task)) {
1937                 ret = PTR_ERR(hb_task);
1938                 mlog_errno(ret);
1939                 goto out3;
1940         }
1941 
1942         spin_lock(&o2hb_live_lock);
1943         reg->hr_task = hb_task;
1944         spin_unlock(&o2hb_live_lock);
1945 
1946         ret = wait_event_interruptible(o2hb_steady_queue,
1947                                 atomic_read(&reg->hr_steady_iterations) == 0 ||
1948                                 reg->hr_node_deleted);
1949         if (ret) {
1950                 atomic_set(&reg->hr_steady_iterations, 0);
1951                 reg->hr_aborted_start = 1;
1952         }
1953 
1954         if (reg->hr_aborted_start) {
1955                 ret = -EIO;
1956                 goto out3;
1957         }
1958 
1959         if (reg->hr_node_deleted) {
1960                 ret = -EINVAL;
1961                 goto out3;
1962         }
1963 
1964         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1965         spin_lock(&o2hb_live_lock);
1966         hb_task = reg->hr_task;
1967         if (o2hb_global_heartbeat_active())
1968                 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1969         spin_unlock(&o2hb_live_lock);
1970 
1971         if (hb_task)
1972                 ret = count;
1973         else
1974                 ret = -EIO;
1975 
1976         if (hb_task && o2hb_global_heartbeat_active())
1977                 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1978                        config_item_name(&reg->hr_item), reg->hr_dev_name);
1979 
1980 out3:
1981         iput(inode);
1982 out2:
1983         fdput(f);
1984 out:
1985         if (ret < 0) {
1986                 if (reg->hr_bdev) {
1987                         blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1988                         reg->hr_bdev = NULL;
1989                 }
1990         }
1991         return ret;
1992 }
1993 
1994 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1995 {
1996         struct o2hb_region *reg = to_o2hb_region(item);
1997         pid_t pid = 0;
1998 
1999         spin_lock(&o2hb_live_lock);
2000         if (reg->hr_task)
2001                 pid = task_pid_nr(reg->hr_task);
2002         spin_unlock(&o2hb_live_lock);
2003 
2004         if (!pid)
2005                 return 0;
2006 
2007         return sprintf(page, "%u\n", pid);
2008 }
2009 
2010 CONFIGFS_ATTR(o2hb_region_, block_bytes);
2011 CONFIGFS_ATTR(o2hb_region_, start_block);
2012 CONFIGFS_ATTR(o2hb_region_, blocks);
2013 CONFIGFS_ATTR(o2hb_region_, dev);
2014 CONFIGFS_ATTR_RO(o2hb_region_, pid);
2015 
2016 static struct configfs_attribute *o2hb_region_attrs[] = {
2017         &o2hb_region_attr_block_bytes,
2018         &o2hb_region_attr_start_block,
2019         &o2hb_region_attr_blocks,
2020         &o2hb_region_attr_dev,
2021         &o2hb_region_attr_pid,
2022         NULL,
2023 };
2024 
2025 static struct configfs_item_operations o2hb_region_item_ops = {
2026         .release                = o2hb_region_release,
2027 };
2028 
2029 static struct config_item_type o2hb_region_type = {
2030         .ct_item_ops    = &o2hb_region_item_ops,
2031         .ct_attrs       = o2hb_region_attrs,
2032         .ct_owner       = THIS_MODULE,
2033 };
2034 
2035 /* heartbeat set */
2036 
2037 struct o2hb_heartbeat_group {
2038         struct config_group hs_group;
2039         /* some stuff? */
2040 };
2041 
2042 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
2043 {
2044         return group ?
2045                 container_of(group, struct o2hb_heartbeat_group, hs_group)
2046                 : NULL;
2047 }
2048 
2049 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
2050 {
2051         int ret = -ENOMEM;
2052 
2053         reg->hr_debug_dir =
2054                 debugfs_create_dir(config_item_name(&reg->hr_item), dir);
2055         if (!reg->hr_debug_dir) {
2056                 mlog_errno(ret);
2057                 goto bail;
2058         }
2059 
2060         reg->hr_debug_livenodes =
2061                         o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2062                                           reg->hr_debug_dir,
2063                                           &(reg->hr_db_livenodes),
2064                                           sizeof(*(reg->hr_db_livenodes)),
2065                                           O2HB_DB_TYPE_REGION_LIVENODES,
2066                                           sizeof(reg->hr_live_node_bitmap),
2067                                           O2NM_MAX_NODES, reg);
2068         if (!reg->hr_debug_livenodes) {
2069                 mlog_errno(ret);
2070                 goto bail;
2071         }
2072 
2073         reg->hr_debug_regnum =
2074                         o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2075                                           reg->hr_debug_dir,
2076                                           &(reg->hr_db_regnum),
2077                                           sizeof(*(reg->hr_db_regnum)),
2078                                           O2HB_DB_TYPE_REGION_NUMBER,
2079                                           0, O2NM_MAX_NODES, reg);
2080         if (!reg->hr_debug_regnum) {
2081                 mlog_errno(ret);
2082                 goto bail;
2083         }
2084 
2085         reg->hr_debug_elapsed_time =
2086                         o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2087                                           reg->hr_debug_dir,
2088                                           &(reg->hr_db_elapsed_time),
2089                                           sizeof(*(reg->hr_db_elapsed_time)),
2090                                           O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2091                                           0, 0, reg);
2092         if (!reg->hr_debug_elapsed_time) {
2093                 mlog_errno(ret);
2094                 goto bail;
2095         }
2096 
2097         reg->hr_debug_pinned =
2098                         o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2099                                           reg->hr_debug_dir,
2100                                           &(reg->hr_db_pinned),
2101                                           sizeof(*(reg->hr_db_pinned)),
2102                                           O2HB_DB_TYPE_REGION_PINNED,
2103                                           0, 0, reg);
2104         if (!reg->hr_debug_pinned) {
2105                 mlog_errno(ret);
2106                 goto bail;
2107         }
2108 
2109         ret = 0;
2110 bail:
2111         return ret;
2112 }
2113 
2114 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2115                                                           const char *name)
2116 {
2117         struct o2hb_region *reg = NULL;
2118         int ret;
2119 
2120         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2121         if (reg == NULL)
2122                 return ERR_PTR(-ENOMEM);
2123 
2124         if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2125                 ret = -ENAMETOOLONG;
2126                 goto free;
2127         }
2128 
2129         spin_lock(&o2hb_live_lock);
2130         reg->hr_region_num = 0;
2131         if (o2hb_global_heartbeat_active()) {
2132                 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2133                                                          O2NM_MAX_REGIONS);
2134                 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2135                         spin_unlock(&o2hb_live_lock);
2136                         ret = -EFBIG;
2137                         goto free;
2138                 }
2139                 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2140         }
2141         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2142         spin_unlock(&o2hb_live_lock);
2143 
2144         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2145 
2146         /* this is the same way to generate msg key as dlm, for local heartbeat,
2147          * name is also the same, so make initial crc value different to avoid
2148          * message key conflict.
2149          */
2150         reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2151                 name, strlen(name));
2152         INIT_LIST_HEAD(&reg->hr_handler_list);
2153         ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2154                         sizeof(struct o2hb_nego_msg),
2155                         o2hb_nego_timeout_handler,
2156                         reg, NULL, &reg->hr_handler_list);
2157         if (ret)
2158                 goto free;
2159 
2160         ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2161                         sizeof(struct o2hb_nego_msg),
2162                         o2hb_nego_approve_handler,
2163                         reg, NULL, &reg->hr_handler_list);
2164         if (ret)
2165                 goto unregister_handler;
2166 
2167         ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2168         if (ret) {
2169                 config_item_put(&reg->hr_item);
2170                 goto unregister_handler;
2171         }
2172 
2173         return &reg->hr_item;
2174 
2175 unregister_handler:
2176         o2net_unregister_handler_list(&reg->hr_handler_list);
2177 free:
2178         kfree(reg);
2179         return ERR_PTR(ret);
2180 }
2181 
2182 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2183                                            struct config_item *item)
2184 {
2185         struct task_struct *hb_task;
2186         struct o2hb_region *reg = to_o2hb_region(item);
2187         int quorum_region = 0;
2188 
2189         /* stop the thread when the user removes the region dir */
2190         spin_lock(&o2hb_live_lock);
2191         hb_task = reg->hr_task;
2192         reg->hr_task = NULL;
2193         reg->hr_item_dropped = 1;
2194         spin_unlock(&o2hb_live_lock);
2195 
2196         if (hb_task)
2197                 kthread_stop(hb_task);
2198 
2199         if (o2hb_global_heartbeat_active()) {
2200                 spin_lock(&o2hb_live_lock);
2201                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2202                 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2203                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2204                         quorum_region = 1;
2205                 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2206                 spin_unlock(&o2hb_live_lock);
2207                 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2208                        ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2209                         "stopped" : "start aborted"), config_item_name(item),
2210                        reg->hr_dev_name);
2211         }
2212 
2213         /*
2214          * If we're racing a dev_write(), we need to wake them.  They will
2215          * check reg->hr_task
2216          */
2217         if (atomic_read(&reg->hr_steady_iterations) != 0) {
2218                 reg->hr_aborted_start = 1;
2219                 atomic_set(&reg->hr_steady_iterations, 0);
2220                 wake_up(&o2hb_steady_queue);
2221         }
2222 
2223         config_item_put(item);
2224 
2225         if (!o2hb_global_heartbeat_active() || !quorum_region)
2226                 return;
2227 
2228         /*
2229          * If global heartbeat active and there are dependent users,
2230          * pin all regions if quorum region count <= CUT_OFF
2231          */
2232         spin_lock(&o2hb_live_lock);
2233 
2234         if (!o2hb_dependent_users)
2235                 goto unlock;
2236 
2237         if (bitmap_weight(o2hb_quorum_region_bitmap,
2238                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2239                 o2hb_region_pin(NULL);
2240 
2241 unlock:
2242         spin_unlock(&o2hb_live_lock);
2243 }
2244 
2245 static ssize_t o2hb_heartbeat_group_threshold_show(struct config_item *item,
2246                 char *page)
2247 {
2248         return sprintf(page, "%u\n", o2hb_dead_threshold);
2249 }
2250 
2251 static ssize_t o2hb_heartbeat_group_threshold_store(struct config_item *item,
2252                 const char *page, size_t count)
2253 {
2254         unsigned long tmp;
2255         char *p = (char *)page;
2256 
2257         tmp = simple_strtoul(p, &p, 10);
2258         if (!p || (*p && (*p != '\n')))
2259                 return -EINVAL;
2260 
2261         /* this will validate ranges for us. */
2262         o2hb_dead_threshold_set((unsigned int) tmp);
2263 
2264         return count;
2265 }
2266 
2267 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2268                 char *page)
2269 {
2270         return sprintf(page, "%s\n",
2271                        o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2272 }
2273 
2274 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2275                 const char *page, size_t count)
2276 {
2277         unsigned int i;
2278         int ret;
2279         size_t len;
2280 
2281         len = (page[count - 1] == '\n') ? count - 1 : count;
2282         if (!len)
2283                 return -EINVAL;
2284 
2285         for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2286                 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2287                         continue;
2288 
2289                 ret = o2hb_global_heartbeat_mode_set(i);
2290                 if (!ret)
2291                         printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2292                                o2hb_heartbeat_mode_desc[i]);
2293                 return count;
2294         }
2295 
2296         return -EINVAL;
2297 
2298 }
2299 
2300 CONFIGFS_ATTR(o2hb_heartbeat_group_, threshold);
2301 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2302 
2303 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2304         &o2hb_heartbeat_group_attr_threshold,
2305         &o2hb_heartbeat_group_attr_mode,
2306         NULL,
2307 };
2308 
2309 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2310         .make_item      = o2hb_heartbeat_group_make_item,
2311         .drop_item      = o2hb_heartbeat_group_drop_item,
2312 };
2313 
2314 static struct config_item_type o2hb_heartbeat_group_type = {
2315         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2316         .ct_attrs       = o2hb_heartbeat_group_attrs,
2317         .ct_owner       = THIS_MODULE,
2318 };
2319 
2320 /* this is just here to avoid touching group in heartbeat.h which the
2321  * entire damn world #includes */
2322 struct config_group *o2hb_alloc_hb_set(void)
2323 {
2324         struct o2hb_heartbeat_group *hs = NULL;
2325         struct config_group *ret = NULL;
2326 
2327         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2328         if (hs == NULL)
2329                 goto out;
2330 
2331         config_group_init_type_name(&hs->hs_group, "heartbeat",
2332                                     &o2hb_heartbeat_group_type);
2333 
2334         ret = &hs->hs_group;
2335 out:
2336         if (ret == NULL)
2337                 kfree(hs);
2338         return ret;
2339 }
2340 
2341 void o2hb_free_hb_set(struct config_group *group)
2342 {
2343         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2344         kfree(hs);
2345 }
2346 
2347 /* hb callback registration and issuing */
2348 
2349 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2350 {
2351         if (type == O2HB_NUM_CB)
2352                 return ERR_PTR(-EINVAL);
2353 
2354         return &o2hb_callbacks[type];
2355 }
2356 
2357 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2358                          enum o2hb_callback_type type,
2359                          o2hb_cb_func *func,
2360                          void *data,
2361                          int priority)
2362 {
2363         INIT_LIST_HEAD(&hc->hc_item);
2364         hc->hc_func = func;
2365         hc->hc_data = data;
2366         hc->hc_priority = priority;
2367         hc->hc_type = type;
2368         hc->hc_magic = O2HB_CB_MAGIC;
2369 }
2370 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2371 
2372 /*
2373  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2374  * In global heartbeat mode, region_uuid passed is NULL.
2375  *
2376  * In local, we only pin the matching region. In global we pin all the active
2377  * regions.
2378  */
2379 static int o2hb_region_pin(const char *region_uuid)
2380 {
2381         int ret = 0, found = 0;
2382         struct o2hb_region *reg;
2383         char *uuid;
2384 
2385         assert_spin_locked(&o2hb_live_lock);
2386 
2387         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2388                 if (reg->hr_item_dropped)
2389                         continue;
2390 
2391                 uuid = config_item_name(&reg->hr_item);
2392 
2393                 /* local heartbeat */
2394                 if (region_uuid) {
2395                         if (strcmp(region_uuid, uuid))
2396                                 continue;
2397                         found = 1;
2398                 }
2399 
2400                 if (reg->hr_item_pinned || reg->hr_item_dropped)
2401                         goto skip_pin;
2402 
2403                 /* Ignore ENOENT only for local hb (userdlm domain) */
2404                 ret = o2nm_depend_item(&reg->hr_item);
2405                 if (!ret) {
2406                         mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2407                         reg->hr_item_pinned = 1;
2408                 } else {
2409                         if (ret == -ENOENT && found)
2410                                 ret = 0;
2411                         else {
2412                                 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2413                                      uuid, ret);
2414                                 break;
2415                         }
2416                 }
2417 skip_pin:
2418                 if (found)
2419                         break;
2420         }
2421 
2422         return ret;
2423 }
2424 
2425 /*
2426  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2427  * In global heartbeat mode, region_uuid passed is NULL.
2428  *
2429  * In local, we only unpin the matching region. In global we unpin all the
2430  * active regions.
2431  */
2432 static void o2hb_region_unpin(const char *region_uuid)
2433 {
2434         struct o2hb_region *reg;
2435         char *uuid;
2436         int found = 0;
2437 
2438         assert_spin_locked(&o2hb_live_lock);
2439 
2440         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2441                 if (reg->hr_item_dropped)
2442                         continue;
2443 
2444                 uuid = config_item_name(&reg->hr_item);
2445                 if (region_uuid) {
2446                         if (strcmp(region_uuid, uuid))
2447                                 continue;
2448                         found = 1;
2449                 }
2450 
2451                 if (reg->hr_item_pinned) {
2452                         mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2453                         o2nm_undepend_item(&reg->hr_item);
2454                         reg->hr_item_pinned = 0;
2455                 }
2456                 if (found)
2457                         break;
2458         }
2459 }
2460 
2461 static int o2hb_region_inc_user(const char *region_uuid)
2462 {
2463         int ret = 0;
2464 
2465         spin_lock(&o2hb_live_lock);
2466 
2467         /* local heartbeat */
2468         if (!o2hb_global_heartbeat_active()) {
2469             ret = o2hb_region_pin(region_uuid);
2470             goto unlock;
2471         }
2472 
2473         /*
2474          * if global heartbeat active and this is the first dependent user,
2475          * pin all regions if quorum region count <= CUT_OFF
2476          */
2477         o2hb_dependent_users++;
2478         if (o2hb_dependent_users > 1)
2479                 goto unlock;
2480 
2481         if (bitmap_weight(o2hb_quorum_region_bitmap,
2482                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2483                 ret = o2hb_region_pin(NULL);
2484 
2485 unlock:
2486         spin_unlock(&o2hb_live_lock);
2487         return ret;
2488 }
2489 
2490 void o2hb_region_dec_user(const char *region_uuid)
2491 {
2492         spin_lock(&o2hb_live_lock);
2493 
2494         /* local heartbeat */
2495         if (!o2hb_global_heartbeat_active()) {
2496             o2hb_region_unpin(region_uuid);
2497             goto unlock;
2498         }
2499 
2500         /*
2501          * if global heartbeat active and there are no dependent users,
2502          * unpin all quorum regions
2503          */
2504         o2hb_dependent_users--;
2505         if (!o2hb_dependent_users)
2506                 o2hb_region_unpin(NULL);
2507 
2508 unlock:
2509         spin_unlock(&o2hb_live_lock);
2510 }
2511 
2512 int o2hb_register_callback(const char *region_uuid,
2513                            struct o2hb_callback_func *hc)
2514 {
2515         struct o2hb_callback_func *f;
2516         struct o2hb_callback *hbcall;
2517         int ret;
2518 
2519         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2520         BUG_ON(!list_empty(&hc->hc_item));
2521 
2522         hbcall = hbcall_from_type(hc->hc_type);
2523         if (IS_ERR(hbcall)) {
2524                 ret = PTR_ERR(hbcall);
2525                 goto out;
2526         }
2527 
2528         if (region_uuid) {
2529                 ret = o2hb_region_inc_user(region_uuid);
2530                 if (ret) {
2531                         mlog_errno(ret);
2532                         goto out;
2533                 }
2534         }
2535 
2536         down_write(&o2hb_callback_sem);
2537 
2538         list_for_each_entry(f, &hbcall->list, hc_item) {
2539                 if (hc->hc_priority < f->hc_priority) {
2540                         list_add_tail(&hc->hc_item, &f->hc_item);
2541                         break;
2542                 }
2543         }
2544         if (list_empty(&hc->hc_item))
2545                 list_add_tail(&hc->hc_item, &hbcall->list);
2546 
2547         up_write(&o2hb_callback_sem);
2548         ret = 0;
2549 out:
2550         mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2551              ret, __builtin_return_address(0), hc);
2552         return ret;
2553 }
2554 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2555 
2556 void o2hb_unregister_callback(const char *region_uuid,
2557                               struct o2hb_callback_func *hc)
2558 {
2559         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2560 
2561         mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2562              __builtin_return_address(0), hc);
2563 
2564         /* XXX Can this happen _with_ a region reference? */
2565         if (list_empty(&hc->hc_item))
2566                 return;
2567 
2568         if (region_uuid)
2569                 o2hb_region_dec_user(region_uuid);
2570 
2571         down_write(&o2hb_callback_sem);
2572 
2573         list_del_init(&hc->hc_item);
2574 
2575         up_write(&o2hb_callback_sem);
2576 }
2577 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2578 
2579 int o2hb_check_node_heartbeating(u8 node_num)
2580 {
2581         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2582 
2583         o2hb_fill_node_map(testing_map, sizeof(testing_map));
2584         if (!test_bit(node_num, testing_map)) {
2585                 mlog(ML_HEARTBEAT,
2586                      "node (%u) does not have heartbeating enabled.\n",
2587                      node_num);
2588                 return 0;
2589         }
2590 
2591         return 1;
2592 }
2593 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2594 
2595 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2596 {
2597         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2598 
2599         spin_lock(&o2hb_live_lock);
2600         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2601         spin_unlock(&o2hb_live_lock);
2602         if (!test_bit(node_num, testing_map)) {
2603                 mlog(ML_HEARTBEAT,
2604                      "node (%u) does not have heartbeating enabled.\n",
2605                      node_num);
2606                 return 0;
2607         }
2608 
2609         return 1;
2610 }
2611 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2612 
2613 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2614 {
2615         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2616 
2617         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2618         if (!test_bit(node_num, testing_map)) {
2619                 mlog(ML_HEARTBEAT,
2620                      "node (%u) does not have heartbeating enabled.\n",
2621                      node_num);
2622                 return 0;
2623         }
2624 
2625         return 1;
2626 }
2627 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2628 
2629 /* Makes sure our local node is configured with a node number, and is
2630  * heartbeating. */
2631 int o2hb_check_local_node_heartbeating(void)
2632 {
2633         u8 node_num;
2634 
2635         /* if this node was set then we have networking */
2636         node_num = o2nm_this_node();
2637         if (node_num == O2NM_MAX_NODES) {
2638                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2639                 return 0;
2640         }
2641 
2642         return o2hb_check_node_heartbeating(node_num);
2643 }
2644 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2645 
2646 /*
2647  * this is just a hack until we get the plumbing which flips file systems
2648  * read only and drops the hb ref instead of killing the node dead.
2649  */
2650 void o2hb_stop_all_regions(void)
2651 {
2652         struct o2hb_region *reg;
2653 
2654         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2655 
2656         spin_lock(&o2hb_live_lock);
2657 
2658         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2659                 reg->hr_unclean_stop = 1;
2660 
2661         spin_unlock(&o2hb_live_lock);
2662 }
2663 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2664 
2665 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2666 {
2667         struct o2hb_region *reg;
2668         int numregs = 0;
2669         char *p;
2670 
2671         spin_lock(&o2hb_live_lock);
2672 
2673         p = region_uuids;
2674         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2675                 if (reg->hr_item_dropped)
2676                         continue;
2677 
2678                 mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2679                 if (numregs < max_regions) {
2680                         memcpy(p, config_item_name(&reg->hr_item),
2681                                O2HB_MAX_REGION_NAME_LEN);
2682                         p += O2HB_MAX_REGION_NAME_LEN;
2683                 }
2684                 numregs++;
2685         }
2686 
2687         spin_unlock(&o2hb_live_lock);
2688 
2689         return numregs;
2690 }
2691 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2692 
2693 int o2hb_global_heartbeat_active(void)
2694 {
2695         return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2696 }
2697 EXPORT_SYMBOL(o2hb_global_heartbeat_active);
2698 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp