~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/ocfs2/cluster/heartbeat.c

Version: ~ [ linux-6.4-rc3 ] ~ [ linux-6.3.4 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.30 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.113 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.180 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.243 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.283 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.315 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0-or-later
  2 /* -*- mode: c; c-basic-offset: 8; -*-
  3  * vim: noexpandtab sw=8 ts=8 sts=0:
  4  *
  5  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
  6  */
  7 
  8 #include <linux/kernel.h>
  9 #include <linux/sched.h>
 10 #include <linux/jiffies.h>
 11 #include <linux/module.h>
 12 #include <linux/fs.h>
 13 #include <linux/bio.h>
 14 #include <linux/blkdev.h>
 15 #include <linux/delay.h>
 16 #include <linux/file.h>
 17 #include <linux/kthread.h>
 18 #include <linux/configfs.h>
 19 #include <linux/random.h>
 20 #include <linux/crc32.h>
 21 #include <linux/time.h>
 22 #include <linux/debugfs.h>
 23 #include <linux/slab.h>
 24 #include <linux/bitmap.h>
 25 #include <linux/ktime.h>
 26 #include "heartbeat.h"
 27 #include "tcp.h"
 28 #include "nodemanager.h"
 29 #include "quorum.h"
 30 
 31 #include "masklog.h"
 32 
 33 
 34 /*
 35  * The first heartbeat pass had one global thread that would serialize all hb
 36  * callback calls.  This global serializing sem should only be removed once
 37  * we've made sure that all callees can deal with being called concurrently
 38  * from multiple hb region threads.
 39  */
 40 static DECLARE_RWSEM(o2hb_callback_sem);
 41 
 42 /*
 43  * multiple hb threads are watching multiple regions.  A node is live
 44  * whenever any of the threads sees activity from the node in its region.
 45  */
 46 static DEFINE_SPINLOCK(o2hb_live_lock);
 47 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
 48 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 49 static LIST_HEAD(o2hb_node_events);
 50 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 51 
 52 /*
 53  * In global heartbeat, we maintain a series of region bitmaps.
 54  *      - o2hb_region_bitmap allows us to limit the region number to max region.
 55  *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
 56  *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
 57  *              heartbeat on it.
 58  *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
 59  */
 60 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 61 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 62 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 63 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 64 
 65 #define O2HB_DB_TYPE_LIVENODES          0
 66 #define O2HB_DB_TYPE_LIVEREGIONS        1
 67 #define O2HB_DB_TYPE_QUORUMREGIONS      2
 68 #define O2HB_DB_TYPE_FAILEDREGIONS      3
 69 #define O2HB_DB_TYPE_REGION_LIVENODES   4
 70 #define O2HB_DB_TYPE_REGION_NUMBER      5
 71 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
 72 #define O2HB_DB_TYPE_REGION_PINNED      7
 73 struct o2hb_debug_buf {
 74         int db_type;
 75         int db_size;
 76         int db_len;
 77         void *db_data;
 78 };
 79 
 80 static struct o2hb_debug_buf *o2hb_db_livenodes;
 81 static struct o2hb_debug_buf *o2hb_db_liveregions;
 82 static struct o2hb_debug_buf *o2hb_db_quorumregions;
 83 static struct o2hb_debug_buf *o2hb_db_failedregions;
 84 
 85 #define O2HB_DEBUG_DIR                  "o2hb"
 86 #define O2HB_DEBUG_LIVENODES            "livenodes"
 87 #define O2HB_DEBUG_LIVEREGIONS          "live_regions"
 88 #define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
 89 #define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
 90 #define O2HB_DEBUG_REGION_NUMBER        "num"
 91 #define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
 92 #define O2HB_DEBUG_REGION_PINNED        "pinned"
 93 
 94 static struct dentry *o2hb_debug_dir;
 95 static struct dentry *o2hb_debug_livenodes;
 96 static struct dentry *o2hb_debug_liveregions;
 97 static struct dentry *o2hb_debug_quorumregions;
 98 static struct dentry *o2hb_debug_failedregions;
 99 
100 static LIST_HEAD(o2hb_all_regions);
101 
102 static struct o2hb_callback {
103         struct list_head list;
104 } o2hb_callbacks[O2HB_NUM_CB];
105 
106 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
107 
108 #define O2HB_DEFAULT_BLOCK_BITS       9
109 
110 enum o2hb_heartbeat_modes {
111         O2HB_HEARTBEAT_LOCAL            = 0,
112         O2HB_HEARTBEAT_GLOBAL,
113         O2HB_HEARTBEAT_NUM_MODES,
114 };
115 
116 static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
117         "local",        /* O2HB_HEARTBEAT_LOCAL */
118         "global",       /* O2HB_HEARTBEAT_GLOBAL */
119 };
120 
121 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
122 static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
123 
124 /*
125  * o2hb_dependent_users tracks the number of registered callbacks that depend
126  * on heartbeat. o2net and o2dlm are two entities that register this callback.
127  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
128  * to stop while a dlm domain is still active.
129  */
130 static unsigned int o2hb_dependent_users;
131 
132 /*
133  * In global heartbeat mode, all regions are pinned if there are one or more
134  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
135  * regions are unpinned if the region count exceeds the cut off or the number
136  * of dependent users falls to zero.
137  */
138 #define O2HB_PIN_CUT_OFF                3
139 
140 /*
141  * In local heartbeat mode, we assume the dlm domain name to be the same as
142  * region uuid. This is true for domains created for the file system but not
143  * necessarily true for userdlm domains. This is a known limitation.
144  *
145  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
146  * works for both file system and userdlm domains.
147  */
148 static int o2hb_region_pin(const char *region_uuid);
149 static void o2hb_region_unpin(const char *region_uuid);
150 
151 /* Only sets a new threshold if there are no active regions.
152  *
153  * No locking or otherwise interesting code is required for reading
154  * o2hb_dead_threshold as it can't change once regions are active and
155  * it's not interesting to anyone until then anyway. */
156 static void o2hb_dead_threshold_set(unsigned int threshold)
157 {
158         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
159                 spin_lock(&o2hb_live_lock);
160                 if (list_empty(&o2hb_all_regions))
161                         o2hb_dead_threshold = threshold;
162                 spin_unlock(&o2hb_live_lock);
163         }
164 }
165 
166 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
167 {
168         int ret = -1;
169 
170         if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
171                 spin_lock(&o2hb_live_lock);
172                 if (list_empty(&o2hb_all_regions)) {
173                         o2hb_heartbeat_mode = hb_mode;
174                         ret = 0;
175                 }
176                 spin_unlock(&o2hb_live_lock);
177         }
178 
179         return ret;
180 }
181 
182 struct o2hb_node_event {
183         struct list_head        hn_item;
184         enum o2hb_callback_type hn_event_type;
185         struct o2nm_node        *hn_node;
186         int                     hn_node_num;
187 };
188 
189 struct o2hb_disk_slot {
190         struct o2hb_disk_heartbeat_block *ds_raw_block;
191         u8                      ds_node_num;
192         u64                     ds_last_time;
193         u64                     ds_last_generation;
194         u16                     ds_equal_samples;
195         u16                     ds_changed_samples;
196         struct list_head        ds_live_item;
197 };
198 
199 /* each thread owns a region.. when we're asked to tear down the region
200  * we ask the thread to stop, who cleans up the region */
201 struct o2hb_region {
202         struct config_item      hr_item;
203 
204         struct list_head        hr_all_item;
205         unsigned                hr_unclean_stop:1,
206                                 hr_aborted_start:1,
207                                 hr_item_pinned:1,
208                                 hr_item_dropped:1,
209                                 hr_node_deleted:1;
210 
211         /* protected by the hr_callback_sem */
212         struct task_struct      *hr_task;
213 
214         unsigned int            hr_blocks;
215         unsigned long long      hr_start_block;
216 
217         unsigned int            hr_block_bits;
218         unsigned int            hr_block_bytes;
219 
220         unsigned int            hr_slots_per_page;
221         unsigned int            hr_num_pages;
222 
223         struct page             **hr_slot_data;
224         struct block_device     *hr_bdev;
225         struct o2hb_disk_slot   *hr_slots;
226 
227         /* live node map of this region */
228         unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
229         unsigned int            hr_region_num;
230 
231         struct dentry           *hr_debug_dir;
232         struct dentry           *hr_debug_livenodes;
233         struct dentry           *hr_debug_regnum;
234         struct dentry           *hr_debug_elapsed_time;
235         struct dentry           *hr_debug_pinned;
236         struct o2hb_debug_buf   *hr_db_livenodes;
237         struct o2hb_debug_buf   *hr_db_regnum;
238         struct o2hb_debug_buf   *hr_db_elapsed_time;
239         struct o2hb_debug_buf   *hr_db_pinned;
240 
241         /* let the person setting up hb wait for it to return until it
242          * has reached a 'steady' state.  This will be fixed when we have
243          * a more complete api that doesn't lead to this sort of fragility. */
244         atomic_t                hr_steady_iterations;
245 
246         /* terminate o2hb thread if it does not reach steady state
247          * (hr_steady_iterations == 0) within hr_unsteady_iterations */
248         atomic_t                hr_unsteady_iterations;
249 
250         char                    hr_dev_name[BDEVNAME_SIZE];
251 
252         unsigned int            hr_timeout_ms;
253 
254         /* randomized as the region goes up and down so that a node
255          * recognizes a node going up and down in one iteration */
256         u64                     hr_generation;
257 
258         struct delayed_work     hr_write_timeout_work;
259         unsigned long           hr_last_timeout_start;
260 
261         /* negotiate timer, used to negotiate extending hb timeout. */
262         struct delayed_work     hr_nego_timeout_work;
263         unsigned long           hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
264 
265         /* Used during o2hb_check_slot to hold a copy of the block
266          * being checked because we temporarily have to zero out the
267          * crc field. */
268         struct o2hb_disk_heartbeat_block *hr_tmp_block;
269 
270         /* Message key for negotiate timeout message. */
271         unsigned int            hr_key;
272         struct list_head        hr_handler_list;
273 
274         /* last hb status, 0 for success, other value for error. */
275         int                     hr_last_hb_status;
276 };
277 
278 struct o2hb_bio_wait_ctxt {
279         atomic_t          wc_num_reqs;
280         struct completion wc_io_complete;
281         int               wc_error;
282 };
283 
284 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
285 
286 enum {
287         O2HB_NEGO_TIMEOUT_MSG = 1,
288         O2HB_NEGO_APPROVE_MSG = 2,
289 };
290 
291 struct o2hb_nego_msg {
292         u8 node_num;
293 };
294 
295 static void o2hb_write_timeout(struct work_struct *work)
296 {
297         int failed, quorum;
298         struct o2hb_region *reg =
299                 container_of(work, struct o2hb_region,
300                              hr_write_timeout_work.work);
301 
302         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
303              "milliseconds\n", reg->hr_dev_name,
304              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
305 
306         if (o2hb_global_heartbeat_active()) {
307                 spin_lock(&o2hb_live_lock);
308                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
309                         set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
310                 failed = bitmap_weight(o2hb_failed_region_bitmap,
311                                         O2NM_MAX_REGIONS);
312                 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
313                                         O2NM_MAX_REGIONS);
314                 spin_unlock(&o2hb_live_lock);
315 
316                 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
317                      quorum, failed);
318 
319                 /*
320                  * Fence if the number of failed regions >= half the number
321                  * of  quorum regions
322                  */
323                 if ((failed << 1) < quorum)
324                         return;
325         }
326 
327         o2quo_disk_timeout();
328 }
329 
330 static void o2hb_arm_timeout(struct o2hb_region *reg)
331 {
332         /* Arm writeout only after thread reaches steady state */
333         if (atomic_read(&reg->hr_steady_iterations) != 0)
334                 return;
335 
336         mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
337              O2HB_MAX_WRITE_TIMEOUT_MS);
338 
339         if (o2hb_global_heartbeat_active()) {
340                 spin_lock(&o2hb_live_lock);
341                 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
342                 spin_unlock(&o2hb_live_lock);
343         }
344         cancel_delayed_work(&reg->hr_write_timeout_work);
345         schedule_delayed_work(&reg->hr_write_timeout_work,
346                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
347 
348         cancel_delayed_work(&reg->hr_nego_timeout_work);
349         /* negotiate timeout must be less than write timeout. */
350         schedule_delayed_work(&reg->hr_nego_timeout_work,
351                               msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
352         memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
353 }
354 
355 static void o2hb_disarm_timeout(struct o2hb_region *reg)
356 {
357         cancel_delayed_work_sync(&reg->hr_write_timeout_work);
358         cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
359 }
360 
361 static int o2hb_send_nego_msg(int key, int type, u8 target)
362 {
363         struct o2hb_nego_msg msg;
364         int status, ret;
365 
366         msg.node_num = o2nm_this_node();
367 again:
368         ret = o2net_send_message(type, key, &msg, sizeof(msg),
369                         target, &status);
370 
371         if (ret == -EAGAIN || ret == -ENOMEM) {
372                 msleep(100);
373                 goto again;
374         }
375 
376         return ret;
377 }
378 
379 static void o2hb_nego_timeout(struct work_struct *work)
380 {
381         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
382         int master_node, i, ret;
383         struct o2hb_region *reg;
384 
385         reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
386         /* don't negotiate timeout if last hb failed since it is very
387          * possible io failed. Should let write timeout fence self.
388          */
389         if (reg->hr_last_hb_status)
390                 return;
391 
392         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
393         /* lowest node as master node to make negotiate decision. */
394         master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
395 
396         if (master_node == o2nm_this_node()) {
397                 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
398                         printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
399                                 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
400                                 config_item_name(&reg->hr_item), reg->hr_dev_name);
401                         set_bit(master_node, reg->hr_nego_node_bitmap);
402                 }
403                 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
404                                 sizeof(reg->hr_nego_node_bitmap))) {
405                         /* check negotiate bitmap every second to do timeout
406                          * approve decision.
407                          */
408                         schedule_delayed_work(&reg->hr_nego_timeout_work,
409                                 msecs_to_jiffies(1000));
410 
411                         return;
412                 }
413 
414                 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
415                         config_item_name(&reg->hr_item), reg->hr_dev_name);
416                 /* approve negotiate timeout request. */
417                 o2hb_arm_timeout(reg);
418 
419                 i = -1;
420                 while ((i = find_next_bit(live_node_bitmap,
421                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
422                         if (i == master_node)
423                                 continue;
424 
425                         mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
426                         ret = o2hb_send_nego_msg(reg->hr_key,
427                                         O2HB_NEGO_APPROVE_MSG, i);
428                         if (ret)
429                                 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
430                                         i, ret);
431                 }
432         } else {
433                 /* negotiate timeout with master node. */
434                 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
435                         o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
436                         reg->hr_dev_name, master_node);
437                 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
438                                 master_node);
439                 if (ret)
440                         mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
441                                 master_node, ret);
442         }
443 }
444 
445 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
446                                 void **ret_data)
447 {
448         struct o2hb_region *reg = data;
449         struct o2hb_nego_msg *nego_msg;
450 
451         nego_msg = (struct o2hb_nego_msg *)msg->buf;
452         printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
453                 nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
454         if (nego_msg->node_num < O2NM_MAX_NODES)
455                 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
456         else
457                 mlog(ML_ERROR, "got nego timeout message from bad node.\n");
458 
459         return 0;
460 }
461 
462 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
463                                 void **ret_data)
464 {
465         struct o2hb_region *reg = data;
466 
467         printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
468                 config_item_name(&reg->hr_item), reg->hr_dev_name);
469         o2hb_arm_timeout(reg);
470         return 0;
471 }
472 
473 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
474 {
475         atomic_set(&wc->wc_num_reqs, 1);
476         init_completion(&wc->wc_io_complete);
477         wc->wc_error = 0;
478 }
479 
480 /* Used in error paths too */
481 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
482                                      unsigned int num)
483 {
484         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
485          * good news is that the fast path only completes one at a time */
486         while(num--) {
487                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
488                         BUG_ON(num > 0);
489                         complete(&wc->wc_io_complete);
490                 }
491         }
492 }
493 
494 static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc)
495 {
496         o2hb_bio_wait_dec(wc, 1);
497         wait_for_completion(&wc->wc_io_complete);
498 }
499 
500 static void o2hb_bio_end_io(struct bio *bio)
501 {
502         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
503 
504         if (bio->bi_status) {
505                 mlog(ML_ERROR, "IO Error %d\n", bio->bi_status);
506                 wc->wc_error = blk_status_to_errno(bio->bi_status);
507         }
508 
509         o2hb_bio_wait_dec(wc, 1);
510         bio_put(bio);
511 }
512 
513 /* Setup a Bio to cover I/O against num_slots slots starting at
514  * start_slot. */
515 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
516                                       struct o2hb_bio_wait_ctxt *wc,
517                                       unsigned int *current_slot,
518                                       unsigned int max_slots, int op,
519                                       int op_flags)
520 {
521         int len, current_page;
522         unsigned int vec_len, vec_start;
523         unsigned int bits = reg->hr_block_bits;
524         unsigned int spp = reg->hr_slots_per_page;
525         unsigned int cs = *current_slot;
526         struct bio *bio;
527         struct page *page;
528 
529         /* Testing has shown this allocation to take long enough under
530          * GFP_KERNEL that the local node can get fenced. It would be
531          * nicest if we could pre-allocate these bios and avoid this
532          * all together. */
533         bio = bio_alloc(GFP_ATOMIC, 16);
534         if (!bio) {
535                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
536                 bio = ERR_PTR(-ENOMEM);
537                 goto bail;
538         }
539 
540         /* Must put everything in 512 byte sectors for the bio... */
541         bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
542         bio_set_dev(bio, reg->hr_bdev);
543         bio->bi_private = wc;
544         bio->bi_end_io = o2hb_bio_end_io;
545         bio_set_op_attrs(bio, op, op_flags);
546 
547         vec_start = (cs << bits) % PAGE_SIZE;
548         while(cs < max_slots) {
549                 current_page = cs / spp;
550                 page = reg->hr_slot_data[current_page];
551 
552                 vec_len = min(PAGE_SIZE - vec_start,
553                               (max_slots-cs) * (PAGE_SIZE/spp) );
554 
555                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
556                      current_page, vec_len, vec_start);
557 
558                 len = bio_add_page(bio, page, vec_len, vec_start);
559                 if (len != vec_len) break;
560 
561                 cs += vec_len / (PAGE_SIZE/spp);
562                 vec_start = 0;
563         }
564 
565 bail:
566         *current_slot = cs;
567         return bio;
568 }
569 
570 static int o2hb_read_slots(struct o2hb_region *reg,
571                            unsigned int begin_slot,
572                            unsigned int max_slots)
573 {
574         unsigned int current_slot = begin_slot;
575         int status;
576         struct o2hb_bio_wait_ctxt wc;
577         struct bio *bio;
578 
579         o2hb_bio_wait_init(&wc);
580 
581         while(current_slot < max_slots) {
582                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
583                                          REQ_OP_READ, 0);
584                 if (IS_ERR(bio)) {
585                         status = PTR_ERR(bio);
586                         mlog_errno(status);
587                         goto bail_and_wait;
588                 }
589 
590                 atomic_inc(&wc.wc_num_reqs);
591                 submit_bio(bio);
592         }
593 
594         status = 0;
595 
596 bail_and_wait:
597         o2hb_wait_on_io(&wc);
598         if (wc.wc_error && !status)
599                 status = wc.wc_error;
600 
601         return status;
602 }
603 
604 static int o2hb_issue_node_write(struct o2hb_region *reg,
605                                  struct o2hb_bio_wait_ctxt *write_wc)
606 {
607         int status;
608         unsigned int slot;
609         struct bio *bio;
610 
611         o2hb_bio_wait_init(write_wc);
612 
613         slot = o2nm_this_node();
614 
615         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE,
616                                  REQ_SYNC);
617         if (IS_ERR(bio)) {
618                 status = PTR_ERR(bio);
619                 mlog_errno(status);
620                 goto bail;
621         }
622 
623         atomic_inc(&write_wc->wc_num_reqs);
624         submit_bio(bio);
625 
626         status = 0;
627 bail:
628         return status;
629 }
630 
631 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
632                                      struct o2hb_disk_heartbeat_block *hb_block)
633 {
634         __le32 old_cksum;
635         u32 ret;
636 
637         /* We want to compute the block crc with a 0 value in the
638          * hb_cksum field. Save it off here and replace after the
639          * crc. */
640         old_cksum = hb_block->hb_cksum;
641         hb_block->hb_cksum = 0;
642 
643         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
644 
645         hb_block->hb_cksum = old_cksum;
646 
647         return ret;
648 }
649 
650 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
651 {
652         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
653              "cksum = 0x%x, generation 0x%llx\n",
654              (long long)le64_to_cpu(hb_block->hb_seq),
655              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
656              (long long)le64_to_cpu(hb_block->hb_generation));
657 }
658 
659 static int o2hb_verify_crc(struct o2hb_region *reg,
660                            struct o2hb_disk_heartbeat_block *hb_block)
661 {
662         u32 read, computed;
663 
664         read = le32_to_cpu(hb_block->hb_cksum);
665         computed = o2hb_compute_block_crc_le(reg, hb_block);
666 
667         return read == computed;
668 }
669 
670 /*
671  * Compare the slot data with what we wrote in the last iteration.
672  * If the match fails, print an appropriate error message. This is to
673  * detect errors like... another node hearting on the same slot,
674  * flaky device that is losing writes, etc.
675  * Returns 1 if check succeeds, 0 otherwise.
676  */
677 static int o2hb_check_own_slot(struct o2hb_region *reg)
678 {
679         struct o2hb_disk_slot *slot;
680         struct o2hb_disk_heartbeat_block *hb_block;
681         char *errstr;
682 
683         slot = &reg->hr_slots[o2nm_this_node()];
684         /* Don't check on our 1st timestamp */
685         if (!slot->ds_last_time)
686                 return 0;
687 
688         hb_block = slot->ds_raw_block;
689         if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
690             le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
691             hb_block->hb_node == slot->ds_node_num)
692                 return 1;
693 
694 #define ERRSTR1         "Another node is heartbeating on device"
695 #define ERRSTR2         "Heartbeat generation mismatch on device"
696 #define ERRSTR3         "Heartbeat sequence mismatch on device"
697 
698         if (hb_block->hb_node != slot->ds_node_num)
699                 errstr = ERRSTR1;
700         else if (le64_to_cpu(hb_block->hb_generation) !=
701                  slot->ds_last_generation)
702                 errstr = ERRSTR2;
703         else
704                 errstr = ERRSTR3;
705 
706         mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
707              "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
708              slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
709              (unsigned long long)slot->ds_last_time, hb_block->hb_node,
710              (unsigned long long)le64_to_cpu(hb_block->hb_generation),
711              (unsigned long long)le64_to_cpu(hb_block->hb_seq));
712 
713         return 0;
714 }
715 
716 static inline void o2hb_prepare_block(struct o2hb_region *reg,
717                                       u64 generation)
718 {
719         int node_num;
720         u64 cputime;
721         struct o2hb_disk_slot *slot;
722         struct o2hb_disk_heartbeat_block *hb_block;
723 
724         node_num = o2nm_this_node();
725         slot = &reg->hr_slots[node_num];
726 
727         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
728         memset(hb_block, 0, reg->hr_block_bytes);
729         /* TODO: time stuff */
730         cputime = ktime_get_real_seconds();
731         if (!cputime)
732                 cputime = 1;
733 
734         hb_block->hb_seq = cpu_to_le64(cputime);
735         hb_block->hb_node = node_num;
736         hb_block->hb_generation = cpu_to_le64(generation);
737         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
738 
739         /* This step must always happen last! */
740         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
741                                                                    hb_block));
742 
743         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
744              (long long)generation,
745              le32_to_cpu(hb_block->hb_cksum));
746 }
747 
748 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
749                                 struct o2nm_node *node,
750                                 int idx)
751 {
752         struct o2hb_callback_func *f;
753 
754         list_for_each_entry(f, &hbcall->list, hc_item) {
755                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
756                 (f->hc_func)(node, idx, f->hc_data);
757         }
758 }
759 
760 /* Will run the list in order until we process the passed event */
761 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
762 {
763         struct o2hb_callback *hbcall;
764         struct o2hb_node_event *event;
765 
766         /* Holding callback sem assures we don't alter the callback
767          * lists when doing this, and serializes ourselves with other
768          * processes wanting callbacks. */
769         down_write(&o2hb_callback_sem);
770 
771         spin_lock(&o2hb_live_lock);
772         while (!list_empty(&o2hb_node_events)
773                && !list_empty(&queued_event->hn_item)) {
774                 event = list_entry(o2hb_node_events.next,
775                                    struct o2hb_node_event,
776                                    hn_item);
777                 list_del_init(&event->hn_item);
778                 spin_unlock(&o2hb_live_lock);
779 
780                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
781                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
782                      event->hn_node_num);
783 
784                 hbcall = hbcall_from_type(event->hn_event_type);
785 
786                 /* We should *never* have gotten on to the list with a
787                  * bad type... This isn't something that we should try
788                  * to recover from. */
789                 BUG_ON(IS_ERR(hbcall));
790 
791                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
792 
793                 spin_lock(&o2hb_live_lock);
794         }
795         spin_unlock(&o2hb_live_lock);
796 
797         up_write(&o2hb_callback_sem);
798 }
799 
800 static void o2hb_queue_node_event(struct o2hb_node_event *event,
801                                   enum o2hb_callback_type type,
802                                   struct o2nm_node *node,
803                                   int node_num)
804 {
805         assert_spin_locked(&o2hb_live_lock);
806 
807         BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
808 
809         event->hn_event_type = type;
810         event->hn_node = node;
811         event->hn_node_num = node_num;
812 
813         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
814              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
815 
816         list_add_tail(&event->hn_item, &o2hb_node_events);
817 }
818 
819 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
820 {
821         struct o2hb_node_event event =
822                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
823         struct o2nm_node *node;
824         int queued = 0;
825 
826         node = o2nm_get_node_by_num(slot->ds_node_num);
827         if (!node)
828                 return;
829 
830         spin_lock(&o2hb_live_lock);
831         if (!list_empty(&slot->ds_live_item)) {
832                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
833                      slot->ds_node_num);
834 
835                 list_del_init(&slot->ds_live_item);
836 
837                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
838                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
839 
840                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
841                                               slot->ds_node_num);
842                         queued = 1;
843                 }
844         }
845         spin_unlock(&o2hb_live_lock);
846 
847         if (queued)
848                 o2hb_run_event_list(&event);
849 
850         o2nm_node_put(node);
851 }
852 
853 static void o2hb_set_quorum_device(struct o2hb_region *reg)
854 {
855         if (!o2hb_global_heartbeat_active())
856                 return;
857 
858         /* Prevent race with o2hb_heartbeat_group_drop_item() */
859         if (kthread_should_stop())
860                 return;
861 
862         /* Tag region as quorum only after thread reaches steady state */
863         if (atomic_read(&reg->hr_steady_iterations) != 0)
864                 return;
865 
866         spin_lock(&o2hb_live_lock);
867 
868         if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
869                 goto unlock;
870 
871         /*
872          * A region can be added to the quorum only when it sees all
873          * live nodes heartbeat on it. In other words, the region has been
874          * added to all nodes.
875          */
876         if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
877                    sizeof(o2hb_live_node_bitmap)))
878                 goto unlock;
879 
880         printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
881                config_item_name(&reg->hr_item), reg->hr_dev_name);
882 
883         set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
884 
885         /*
886          * If global heartbeat active, unpin all regions if the
887          * region count > CUT_OFF
888          */
889         if (bitmap_weight(o2hb_quorum_region_bitmap,
890                            O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
891                 o2hb_region_unpin(NULL);
892 unlock:
893         spin_unlock(&o2hb_live_lock);
894 }
895 
896 static int o2hb_check_slot(struct o2hb_region *reg,
897                            struct o2hb_disk_slot *slot)
898 {
899         int changed = 0, gen_changed = 0;
900         struct o2hb_node_event event =
901                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
902         struct o2nm_node *node;
903         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
904         u64 cputime;
905         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
906         unsigned int slot_dead_ms;
907         int tmp;
908         int queued = 0;
909 
910         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
911 
912         /*
913          * If a node is no longer configured but is still in the livemap, we
914          * may need to clear that bit from the livemap.
915          */
916         node = o2nm_get_node_by_num(slot->ds_node_num);
917         if (!node) {
918                 spin_lock(&o2hb_live_lock);
919                 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
920                 spin_unlock(&o2hb_live_lock);
921                 if (!tmp)
922                         return 0;
923         }
924 
925         if (!o2hb_verify_crc(reg, hb_block)) {
926                 /* all paths from here will drop o2hb_live_lock for
927                  * us. */
928                 spin_lock(&o2hb_live_lock);
929 
930                 /* Don't print an error on the console in this case -
931                  * a freshly formatted heartbeat area will not have a
932                  * crc set on it. */
933                 if (list_empty(&slot->ds_live_item))
934                         goto out;
935 
936                 /* The node is live but pushed out a bad crc. We
937                  * consider it a transient miss but don't populate any
938                  * other values as they may be junk. */
939                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
940                      slot->ds_node_num, reg->hr_dev_name);
941                 o2hb_dump_slot(hb_block);
942 
943                 slot->ds_equal_samples++;
944                 goto fire_callbacks;
945         }
946 
947         /* we don't care if these wrap.. the state transitions below
948          * clear at the right places */
949         cputime = le64_to_cpu(hb_block->hb_seq);
950         if (slot->ds_last_time != cputime)
951                 slot->ds_changed_samples++;
952         else
953                 slot->ds_equal_samples++;
954         slot->ds_last_time = cputime;
955 
956         /* The node changed heartbeat generations. We assume this to
957          * mean it dropped off but came back before we timed out. We
958          * want to consider it down for the time being but don't want
959          * to lose any changed_samples state we might build up to
960          * considering it live again. */
961         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
962                 gen_changed = 1;
963                 slot->ds_equal_samples = 0;
964                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
965                      "to 0x%llx)\n", slot->ds_node_num,
966                      (long long)slot->ds_last_generation,
967                      (long long)le64_to_cpu(hb_block->hb_generation));
968         }
969 
970         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
971 
972         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
973              "seq %llu last %llu changed %u equal %u\n",
974              slot->ds_node_num, (long long)slot->ds_last_generation,
975              le32_to_cpu(hb_block->hb_cksum),
976              (unsigned long long)le64_to_cpu(hb_block->hb_seq),
977              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
978              slot->ds_equal_samples);
979 
980         spin_lock(&o2hb_live_lock);
981 
982 fire_callbacks:
983         /* dead nodes only come to life after some number of
984          * changes at any time during their dead time */
985         if (list_empty(&slot->ds_live_item) &&
986             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
987                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
988                      slot->ds_node_num, (long long)slot->ds_last_generation);
989 
990                 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
991 
992                 /* first on the list generates a callback */
993                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
994                         mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
995                              "bitmap\n", slot->ds_node_num);
996                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
997 
998                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
999                                               slot->ds_node_num);
1000 
1001                         changed = 1;
1002                         queued = 1;
1003                 }
1004 
1005                 list_add_tail(&slot->ds_live_item,
1006                               &o2hb_live_slots[slot->ds_node_num]);
1007 
1008                 slot->ds_equal_samples = 0;
1009 
1010                 /* We want to be sure that all nodes agree on the
1011                  * number of milliseconds before a node will be
1012                  * considered dead. The self-fencing timeout is
1013                  * computed from this value, and a discrepancy might
1014                  * result in heartbeat calling a node dead when it
1015                  * hasn't self-fenced yet. */
1016                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1017                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
1018                         /* TODO: Perhaps we can fail the region here. */
1019                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
1020                              "of %u ms, but our count is %u ms.\n"
1021                              "Please double check your configuration values "
1022                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1023                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1024                              dead_ms);
1025                 }
1026                 goto out;
1027         }
1028 
1029         /* if the list is dead, we're done.. */
1030         if (list_empty(&slot->ds_live_item))
1031                 goto out;
1032 
1033         /* live nodes only go dead after enough consequtive missed
1034          * samples..  reset the missed counter whenever we see
1035          * activity */
1036         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1037                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
1038                      slot->ds_node_num);
1039 
1040                 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1041 
1042                 /* last off the live_slot generates a callback */
1043                 list_del_init(&slot->ds_live_item);
1044                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1045                         mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1046                              "nodes bitmap\n", slot->ds_node_num);
1047                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1048 
1049                         /* node can be null */
1050                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1051                                               node, slot->ds_node_num);
1052 
1053                         changed = 1;
1054                         queued = 1;
1055                 }
1056 
1057                 /* We don't clear this because the node is still
1058                  * actually writing new blocks. */
1059                 if (!gen_changed)
1060                         slot->ds_changed_samples = 0;
1061                 goto out;
1062         }
1063         if (slot->ds_changed_samples) {
1064                 slot->ds_changed_samples = 0;
1065                 slot->ds_equal_samples = 0;
1066         }
1067 out:
1068         spin_unlock(&o2hb_live_lock);
1069 
1070         if (queued)
1071                 o2hb_run_event_list(&event);
1072 
1073         if (node)
1074                 o2nm_node_put(node);
1075         return changed;
1076 }
1077 
1078 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1079 {
1080         return find_last_bit(nodes, numbits);
1081 }
1082 
1083 static int o2hb_lowest_node(unsigned long *nodes, int numbits)
1084 {
1085         return find_first_bit(nodes, numbits);
1086 }
1087 
1088 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1089 {
1090         int i, ret, highest_node, lowest_node;
1091         int membership_change = 0, own_slot_ok = 0;
1092         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1093         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1094         struct o2hb_bio_wait_ctxt write_wc;
1095 
1096         ret = o2nm_configured_node_map(configured_nodes,
1097                                        sizeof(configured_nodes));
1098         if (ret) {
1099                 mlog_errno(ret);
1100                 goto bail;
1101         }
1102 
1103         /*
1104          * If a node is not configured but is in the livemap, we still need
1105          * to read the slot so as to be able to remove it from the livemap.
1106          */
1107         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1108         i = -1;
1109         while ((i = find_next_bit(live_node_bitmap,
1110                                   O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1111                 set_bit(i, configured_nodes);
1112         }
1113 
1114         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1115         lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES);
1116         if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) {
1117                 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1118                 ret = -EINVAL;
1119                 goto bail;
1120         }
1121 
1122         /* No sense in reading the slots of nodes that don't exist
1123          * yet. Of course, if the node definitions have holes in them
1124          * then we're reading an empty slot anyway... Consider this
1125          * best-effort. */
1126         ret = o2hb_read_slots(reg, lowest_node, highest_node + 1);
1127         if (ret < 0) {
1128                 mlog_errno(ret);
1129                 goto bail;
1130         }
1131 
1132         /* With an up to date view of the slots, we can check that no
1133          * other node has been improperly configured to heartbeat in
1134          * our slot. */
1135         own_slot_ok = o2hb_check_own_slot(reg);
1136 
1137         /* fill in the proper info for our next heartbeat */
1138         o2hb_prepare_block(reg, reg->hr_generation);
1139 
1140         ret = o2hb_issue_node_write(reg, &write_wc);
1141         if (ret < 0) {
1142                 mlog_errno(ret);
1143                 goto bail;
1144         }
1145 
1146         i = -1;
1147         while((i = find_next_bit(configured_nodes,
1148                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1149                 membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1150         }
1151 
1152         /*
1153          * We have to be sure we've advertised ourselves on disk
1154          * before we can go to steady state.  This ensures that
1155          * people we find in our steady state have seen us.
1156          */
1157         o2hb_wait_on_io(&write_wc);
1158         if (write_wc.wc_error) {
1159                 /* Do not re-arm the write timeout on I/O error - we
1160                  * can't be sure that the new block ever made it to
1161                  * disk */
1162                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1163                      write_wc.wc_error, reg->hr_dev_name);
1164                 ret = write_wc.wc_error;
1165                 goto bail;
1166         }
1167 
1168         /* Skip disarming the timeout if own slot has stale/bad data */
1169         if (own_slot_ok) {
1170                 o2hb_set_quorum_device(reg);
1171                 o2hb_arm_timeout(reg);
1172                 reg->hr_last_timeout_start = jiffies;
1173         }
1174 
1175 bail:
1176         /* let the person who launched us know when things are steady */
1177         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1178                 if (!ret && own_slot_ok && !membership_change) {
1179                         if (atomic_dec_and_test(&reg->hr_steady_iterations))
1180                                 wake_up(&o2hb_steady_queue);
1181                 }
1182         }
1183 
1184         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1185                 if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1186                         printk(KERN_NOTICE "o2hb: Unable to stabilize "
1187                                "heartbeart on region %s (%s)\n",
1188                                config_item_name(&reg->hr_item),
1189                                reg->hr_dev_name);
1190                         atomic_set(&reg->hr_steady_iterations, 0);
1191                         reg->hr_aborted_start = 1;
1192                         wake_up(&o2hb_steady_queue);
1193                         ret = -EIO;
1194                 }
1195         }
1196 
1197         return ret;
1198 }
1199 
1200 /*
1201  * we ride the region ref that the region dir holds.  before the region
1202  * dir is removed and drops it ref it will wait to tear down this
1203  * thread.
1204  */
1205 static int o2hb_thread(void *data)
1206 {
1207         int i, ret;
1208         struct o2hb_region *reg = data;
1209         struct o2hb_bio_wait_ctxt write_wc;
1210         ktime_t before_hb, after_hb;
1211         unsigned int elapsed_msec;
1212 
1213         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1214 
1215         set_user_nice(current, MIN_NICE);
1216 
1217         /* Pin node */
1218         ret = o2nm_depend_this_node();
1219         if (ret) {
1220                 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1221                 reg->hr_node_deleted = 1;
1222                 wake_up(&o2hb_steady_queue);
1223                 return 0;
1224         }
1225 
1226         while (!kthread_should_stop() &&
1227                !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1228                 /* We track the time spent inside
1229                  * o2hb_do_disk_heartbeat so that we avoid more than
1230                  * hr_timeout_ms between disk writes. On busy systems
1231                  * this should result in a heartbeat which is less
1232                  * likely to time itself out. */
1233                 before_hb = ktime_get_real();
1234 
1235                 ret = o2hb_do_disk_heartbeat(reg);
1236                 reg->hr_last_hb_status = ret;
1237 
1238                 after_hb = ktime_get_real();
1239 
1240                 elapsed_msec = (unsigned int)
1241                                 ktime_ms_delta(after_hb, before_hb);
1242 
1243                 mlog(ML_HEARTBEAT,
1244                      "start = %lld, end = %lld, msec = %u, ret = %d\n",
1245                      before_hb, after_hb, elapsed_msec, ret);
1246 
1247                 if (!kthread_should_stop() &&
1248                     elapsed_msec < reg->hr_timeout_ms) {
1249                         /* the kthread api has blocked signals for us so no
1250                          * need to record the return value. */
1251                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1252                 }
1253         }
1254 
1255         o2hb_disarm_timeout(reg);
1256 
1257         /* unclean stop is only used in very bad situation */
1258         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1259                 o2hb_shutdown_slot(&reg->hr_slots[i]);
1260 
1261         /* Explicit down notification - avoid forcing the other nodes
1262          * to timeout on this region when we could just as easily
1263          * write a clear generation - thus indicating to them that
1264          * this node has left this region.
1265          */
1266         if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1267                 o2hb_prepare_block(reg, 0);
1268                 ret = o2hb_issue_node_write(reg, &write_wc);
1269                 if (ret == 0)
1270                         o2hb_wait_on_io(&write_wc);
1271                 else
1272                         mlog_errno(ret);
1273         }
1274 
1275         /* Unpin node */
1276         o2nm_undepend_this_node();
1277 
1278         mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1279 
1280         return 0;
1281 }
1282 
1283 #ifdef CONFIG_DEBUG_FS
1284 static int o2hb_debug_open(struct inode *inode, struct file *file)
1285 {
1286         struct o2hb_debug_buf *db = inode->i_private;
1287         struct o2hb_region *reg;
1288         unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1289         unsigned long lts;
1290         char *buf = NULL;
1291         int i = -1;
1292         int out = 0;
1293 
1294         /* max_nodes should be the largest bitmap we pass here */
1295         BUG_ON(sizeof(map) < db->db_size);
1296 
1297         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1298         if (!buf)
1299                 goto bail;
1300 
1301         switch (db->db_type) {
1302         case O2HB_DB_TYPE_LIVENODES:
1303         case O2HB_DB_TYPE_LIVEREGIONS:
1304         case O2HB_DB_TYPE_QUORUMREGIONS:
1305         case O2HB_DB_TYPE_FAILEDREGIONS:
1306                 spin_lock(&o2hb_live_lock);
1307                 memcpy(map, db->db_data, db->db_size);
1308                 spin_unlock(&o2hb_live_lock);
1309                 break;
1310 
1311         case O2HB_DB_TYPE_REGION_LIVENODES:
1312                 spin_lock(&o2hb_live_lock);
1313                 reg = (struct o2hb_region *)db->db_data;
1314                 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1315                 spin_unlock(&o2hb_live_lock);
1316                 break;
1317 
1318         case O2HB_DB_TYPE_REGION_NUMBER:
1319                 reg = (struct o2hb_region *)db->db_data;
1320                 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1321                                 reg->hr_region_num);
1322                 goto done;
1323 
1324         case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1325                 reg = (struct o2hb_region *)db->db_data;
1326                 lts = reg->hr_last_timeout_start;
1327                 /* If 0, it has never been set before */
1328                 if (lts)
1329                         lts = jiffies_to_msecs(jiffies - lts);
1330                 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1331                 goto done;
1332 
1333         case O2HB_DB_TYPE_REGION_PINNED:
1334                 reg = (struct o2hb_region *)db->db_data;
1335                 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1336                                 !!reg->hr_item_pinned);
1337                 goto done;
1338 
1339         default:
1340                 goto done;
1341         }
1342 
1343         while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1344                 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1345         out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1346 
1347 done:
1348         i_size_write(inode, out);
1349 
1350         file->private_data = buf;
1351 
1352         return 0;
1353 bail:
1354         return -ENOMEM;
1355 }
1356 
1357 static int o2hb_debug_release(struct inode *inode, struct file *file)
1358 {
1359         kfree(file->private_data);
1360         return 0;
1361 }
1362 
1363 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1364                                  size_t nbytes, loff_t *ppos)
1365 {
1366         return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1367                                        i_size_read(file->f_mapping->host));
1368 }
1369 #else
1370 static int o2hb_debug_open(struct inode *inode, struct file *file)
1371 {
1372         return 0;
1373 }
1374 static int o2hb_debug_release(struct inode *inode, struct file *file)
1375 {
1376         return 0;
1377 }
1378 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1379                                size_t nbytes, loff_t *ppos)
1380 {
1381         return 0;
1382 }
1383 #endif  /* CONFIG_DEBUG_FS */
1384 
1385 static const struct file_operations o2hb_debug_fops = {
1386         .open =         o2hb_debug_open,
1387         .release =      o2hb_debug_release,
1388         .read =         o2hb_debug_read,
1389         .llseek =       generic_file_llseek,
1390 };
1391 
1392 void o2hb_exit(void)
1393 {
1394         debugfs_remove(o2hb_debug_failedregions);
1395         debugfs_remove(o2hb_debug_quorumregions);
1396         debugfs_remove(o2hb_debug_liveregions);
1397         debugfs_remove(o2hb_debug_livenodes);
1398         debugfs_remove(o2hb_debug_dir);
1399         kfree(o2hb_db_livenodes);
1400         kfree(o2hb_db_liveregions);
1401         kfree(o2hb_db_quorumregions);
1402         kfree(o2hb_db_failedregions);
1403 }
1404 
1405 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1406                                         struct o2hb_debug_buf **db, int db_len,
1407                                         int type, int size, int len, void *data)
1408 {
1409         *db = kmalloc(db_len, GFP_KERNEL);
1410         if (!*db)
1411                 return NULL;
1412 
1413         (*db)->db_type = type;
1414         (*db)->db_size = size;
1415         (*db)->db_len = len;
1416         (*db)->db_data = data;
1417 
1418         return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1419                                    &o2hb_debug_fops);
1420 }
1421 
1422 static int o2hb_debug_init(void)
1423 {
1424         int ret = -ENOMEM;
1425 
1426         o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1427         if (!o2hb_debug_dir) {
1428                 mlog_errno(ret);
1429                 goto bail;
1430         }
1431 
1432         o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1433                                                  o2hb_debug_dir,
1434                                                  &o2hb_db_livenodes,
1435                                                  sizeof(*o2hb_db_livenodes),
1436                                                  O2HB_DB_TYPE_LIVENODES,
1437                                                  sizeof(o2hb_live_node_bitmap),
1438                                                  O2NM_MAX_NODES,
1439                                                  o2hb_live_node_bitmap);
1440         if (!o2hb_debug_livenodes) {
1441                 mlog_errno(ret);
1442                 goto bail;
1443         }
1444 
1445         o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1446                                                    o2hb_debug_dir,
1447                                                    &o2hb_db_liveregions,
1448                                                    sizeof(*o2hb_db_liveregions),
1449                                                    O2HB_DB_TYPE_LIVEREGIONS,
1450                                                    sizeof(o2hb_live_region_bitmap),
1451                                                    O2NM_MAX_REGIONS,
1452                                                    o2hb_live_region_bitmap);
1453         if (!o2hb_debug_liveregions) {
1454                 mlog_errno(ret);
1455                 goto bail;
1456         }
1457 
1458         o2hb_debug_quorumregions =
1459                         o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1460                                           o2hb_debug_dir,
1461                                           &o2hb_db_quorumregions,
1462                                           sizeof(*o2hb_db_quorumregions),
1463                                           O2HB_DB_TYPE_QUORUMREGIONS,
1464                                           sizeof(o2hb_quorum_region_bitmap),
1465                                           O2NM_MAX_REGIONS,
1466                                           o2hb_quorum_region_bitmap);
1467         if (!o2hb_debug_quorumregions) {
1468                 mlog_errno(ret);
1469                 goto bail;
1470         }
1471 
1472         o2hb_debug_failedregions =
1473                         o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1474                                           o2hb_debug_dir,
1475                                           &o2hb_db_failedregions,
1476                                           sizeof(*o2hb_db_failedregions),
1477                                           O2HB_DB_TYPE_FAILEDREGIONS,
1478                                           sizeof(o2hb_failed_region_bitmap),
1479                                           O2NM_MAX_REGIONS,
1480                                           o2hb_failed_region_bitmap);
1481         if (!o2hb_debug_failedregions) {
1482                 mlog_errno(ret);
1483                 goto bail;
1484         }
1485 
1486         ret = 0;
1487 bail:
1488         if (ret)
1489                 o2hb_exit();
1490 
1491         return ret;
1492 }
1493 
1494 int o2hb_init(void)
1495 {
1496         int i;
1497 
1498         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1499                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1500 
1501         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1502                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1503 
1504         INIT_LIST_HEAD(&o2hb_node_events);
1505 
1506         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1507         memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1508         memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1509         memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1510         memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1511 
1512         o2hb_dependent_users = 0;
1513 
1514         return o2hb_debug_init();
1515 }
1516 
1517 /* if we're already in a callback then we're already serialized by the sem */
1518 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1519                                              unsigned bytes)
1520 {
1521         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1522 
1523         memcpy(map, &o2hb_live_node_bitmap, bytes);
1524 }
1525 
1526 /*
1527  * get a map of all nodes that are heartbeating in any regions
1528  */
1529 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1530 {
1531         /* callers want to serialize this map and callbacks so that they
1532          * can trust that they don't miss nodes coming to the party */
1533         down_read(&o2hb_callback_sem);
1534         spin_lock(&o2hb_live_lock);
1535         o2hb_fill_node_map_from_callback(map, bytes);
1536         spin_unlock(&o2hb_live_lock);
1537         up_read(&o2hb_callback_sem);
1538 }
1539 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1540 
1541 /*
1542  * heartbeat configfs bits.  The heartbeat set is a default set under
1543  * the cluster set in nodemanager.c.
1544  */
1545 
1546 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1547 {
1548         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1549 }
1550 
1551 /* drop_item only drops its ref after killing the thread, nothing should
1552  * be using the region anymore.  this has to clean up any state that
1553  * attributes might have built up. */
1554 static void o2hb_region_release(struct config_item *item)
1555 {
1556         int i;
1557         struct page *page;
1558         struct o2hb_region *reg = to_o2hb_region(item);
1559 
1560         mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1561 
1562         kfree(reg->hr_tmp_block);
1563 
1564         if (reg->hr_slot_data) {
1565                 for (i = 0; i < reg->hr_num_pages; i++) {
1566                         page = reg->hr_slot_data[i];
1567                         if (page)
1568                                 __free_page(page);
1569                 }
1570                 kfree(reg->hr_slot_data);
1571         }
1572 
1573         if (reg->hr_bdev)
1574                 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1575 
1576         kfree(reg->hr_slots);
1577 
1578         debugfs_remove(reg->hr_debug_livenodes);
1579         debugfs_remove(reg->hr_debug_regnum);
1580         debugfs_remove(reg->hr_debug_elapsed_time);
1581         debugfs_remove(reg->hr_debug_pinned);
1582         debugfs_remove(reg->hr_debug_dir);
1583         kfree(reg->hr_db_livenodes);
1584         kfree(reg->hr_db_regnum);
1585         kfree(reg->hr_db_elapsed_time);
1586         kfree(reg->hr_db_pinned);
1587 
1588         spin_lock(&o2hb_live_lock);
1589         list_del(&reg->hr_all_item);
1590         spin_unlock(&o2hb_live_lock);
1591 
1592         o2net_unregister_handler_list(&reg->hr_handler_list);
1593         kfree(reg);
1594 }
1595 
1596 static int o2hb_read_block_input(struct o2hb_region *reg,
1597                                  const char *page,
1598                                  unsigned long *ret_bytes,
1599                                  unsigned int *ret_bits)
1600 {
1601         unsigned long bytes;
1602         char *p = (char *)page;
1603 
1604         bytes = simple_strtoul(p, &p, 0);
1605         if (!p || (*p && (*p != '\n')))
1606                 return -EINVAL;
1607 
1608         /* Heartbeat and fs min / max block sizes are the same. */
1609         if (bytes > 4096 || bytes < 512)
1610                 return -ERANGE;
1611         if (hweight16(bytes) != 1)
1612                 return -EINVAL;
1613 
1614         if (ret_bytes)
1615                 *ret_bytes = bytes;
1616         if (ret_bits)
1617                 *ret_bits = ffs(bytes) - 1;
1618 
1619         return 0;
1620 }
1621 
1622 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1623                                             char *page)
1624 {
1625         return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1626 }
1627 
1628 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1629                                              const char *page,
1630                                              size_t count)
1631 {
1632         struct o2hb_region *reg = to_o2hb_region(item);
1633         int status;
1634         unsigned long block_bytes;
1635         unsigned int block_bits;
1636 
1637         if (reg->hr_bdev)
1638                 return -EINVAL;
1639 
1640         status = o2hb_read_block_input(reg, page, &block_bytes,
1641                                        &block_bits);
1642         if (status)
1643                 return status;
1644 
1645         reg->hr_block_bytes = (unsigned int)block_bytes;
1646         reg->hr_block_bits = block_bits;
1647 
1648         return count;
1649 }
1650 
1651 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1652                                             char *page)
1653 {
1654         return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1655 }
1656 
1657 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1658                                              const char *page,
1659                                              size_t count)
1660 {
1661         struct o2hb_region *reg = to_o2hb_region(item);
1662         unsigned long long tmp;
1663         char *p = (char *)page;
1664 
1665         if (reg->hr_bdev)
1666                 return -EINVAL;
1667 
1668         tmp = simple_strtoull(p, &p, 0);
1669         if (!p || (*p && (*p != '\n')))
1670                 return -EINVAL;
1671 
1672         reg->hr_start_block = tmp;
1673 
1674         return count;
1675 }
1676 
1677 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1678 {
1679         return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1680 }
1681 
1682 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1683                                         const char *page,
1684                                         size_t count)
1685 {
1686         struct o2hb_region *reg = to_o2hb_region(item);
1687         unsigned long tmp;
1688         char *p = (char *)page;
1689 
1690         if (reg->hr_bdev)
1691                 return -EINVAL;
1692 
1693         tmp = simple_strtoul(p, &p, 0);
1694         if (!p || (*p && (*p != '\n')))
1695                 return -EINVAL;
1696 
1697         if (tmp > O2NM_MAX_NODES || tmp == 0)
1698                 return -ERANGE;
1699 
1700         reg->hr_blocks = (unsigned int)tmp;
1701 
1702         return count;
1703 }
1704 
1705 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1706 {
1707         unsigned int ret = 0;
1708 
1709         if (to_o2hb_region(item)->hr_bdev)
1710                 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1711 
1712         return ret;
1713 }
1714 
1715 static void o2hb_init_region_params(struct o2hb_region *reg)
1716 {
1717         reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1718         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1719 
1720         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1721              reg->hr_start_block, reg->hr_blocks);
1722         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1723              reg->hr_block_bytes, reg->hr_block_bits);
1724         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1725         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1726 }
1727 
1728 static int o2hb_map_slot_data(struct o2hb_region *reg)
1729 {
1730         int i, j;
1731         unsigned int last_slot;
1732         unsigned int spp = reg->hr_slots_per_page;
1733         struct page *page;
1734         char *raw;
1735         struct o2hb_disk_slot *slot;
1736 
1737         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1738         if (reg->hr_tmp_block == NULL)
1739                 return -ENOMEM;
1740 
1741         reg->hr_slots = kcalloc(reg->hr_blocks,
1742                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1743         if (reg->hr_slots == NULL)
1744                 return -ENOMEM;
1745 
1746         for(i = 0; i < reg->hr_blocks; i++) {
1747                 slot = &reg->hr_slots[i];
1748                 slot->ds_node_num = i;
1749                 INIT_LIST_HEAD(&slot->ds_live_item);
1750                 slot->ds_raw_block = NULL;
1751         }
1752 
1753         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1754         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1755                            "at %u blocks per page\n",
1756              reg->hr_num_pages, reg->hr_blocks, spp);
1757 
1758         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1759                                     GFP_KERNEL);
1760         if (!reg->hr_slot_data)
1761                 return -ENOMEM;
1762 
1763         for(i = 0; i < reg->hr_num_pages; i++) {
1764                 page = alloc_page(GFP_KERNEL);
1765                 if (!page)
1766                         return -ENOMEM;
1767 
1768                 reg->hr_slot_data[i] = page;
1769 
1770                 last_slot = i * spp;
1771                 raw = page_address(page);
1772                 for (j = 0;
1773                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1774                      j++) {
1775                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1776 
1777                         slot = &reg->hr_slots[j + last_slot];
1778                         slot->ds_raw_block =
1779                                 (struct o2hb_disk_heartbeat_block *) raw;
1780 
1781                         raw += reg->hr_block_bytes;
1782                 }
1783         }
1784 
1785         return 0;
1786 }
1787 
1788 /* Read in all the slots available and populate the tracking
1789  * structures so that we can start with a baseline idea of what's
1790  * there. */
1791 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1792 {
1793         int ret, i;
1794         struct o2hb_disk_slot *slot;
1795         struct o2hb_disk_heartbeat_block *hb_block;
1796 
1797         ret = o2hb_read_slots(reg, 0, reg->hr_blocks);
1798         if (ret)
1799                 goto out;
1800 
1801         /* We only want to get an idea of the values initially in each
1802          * slot, so we do no verification - o2hb_check_slot will
1803          * actually determine if each configured slot is valid and
1804          * whether any values have changed. */
1805         for(i = 0; i < reg->hr_blocks; i++) {
1806                 slot = &reg->hr_slots[i];
1807                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1808 
1809                 /* Only fill the values that o2hb_check_slot uses to
1810                  * determine changing slots */
1811                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1812                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1813         }
1814 
1815 out:
1816         return ret;
1817 }
1818 
1819 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1820 static ssize_t o2hb_region_dev_store(struct config_item *item,
1821                                      const char *page,
1822                                      size_t count)
1823 {
1824         struct o2hb_region *reg = to_o2hb_region(item);
1825         struct task_struct *hb_task;
1826         long fd;
1827         int sectsize;
1828         char *p = (char *)page;
1829         struct fd f;
1830         struct inode *inode;
1831         ssize_t ret = -EINVAL;
1832         int live_threshold;
1833 
1834         if (reg->hr_bdev)
1835                 goto out;
1836 
1837         /* We can't heartbeat without having had our node number
1838          * configured yet. */
1839         if (o2nm_this_node() == O2NM_MAX_NODES)
1840                 goto out;
1841 
1842         fd = simple_strtol(p, &p, 0);
1843         if (!p || (*p && (*p != '\n')))
1844                 goto out;
1845 
1846         if (fd < 0 || fd >= INT_MAX)
1847                 goto out;
1848 
1849         f = fdget(fd);
1850         if (f.file == NULL)
1851                 goto out;
1852 
1853         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1854             reg->hr_block_bytes == 0)
1855                 goto out2;
1856 
1857         inode = igrab(f.file->f_mapping->host);
1858         if (inode == NULL)
1859                 goto out2;
1860 
1861         if (!S_ISBLK(inode->i_mode))
1862                 goto out3;
1863 
1864         reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1865         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1866         if (ret) {
1867                 reg->hr_bdev = NULL;
1868                 goto out3;
1869         }
1870         inode = NULL;
1871 
1872         bdevname(reg->hr_bdev, reg->hr_dev_name);
1873 
1874         sectsize = bdev_logical_block_size(reg->hr_bdev);
1875         if (sectsize != reg->hr_block_bytes) {
1876                 mlog(ML_ERROR,
1877                      "blocksize %u incorrect for device, expected %d",
1878                      reg->hr_block_bytes, sectsize);
1879                 ret = -EINVAL;
1880                 goto out3;
1881         }
1882 
1883         o2hb_init_region_params(reg);
1884 
1885         /* Generation of zero is invalid */
1886         do {
1887                 get_random_bytes(&reg->hr_generation,
1888                                  sizeof(reg->hr_generation));
1889         } while (reg->hr_generation == 0);
1890 
1891         ret = o2hb_map_slot_data(reg);
1892         if (ret) {
1893                 mlog_errno(ret);
1894                 goto out3;
1895         }
1896 
1897         ret = o2hb_populate_slot_data(reg);
1898         if (ret) {
1899                 mlog_errno(ret);
1900                 goto out3;
1901         }
1902 
1903         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1904         INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1905 
1906         /*
1907          * A node is considered live after it has beat LIVE_THRESHOLD
1908          * times.  We're not steady until we've given them a chance
1909          * _after_ our first read.
1910          * The default threshold is bare minimum so as to limit the delay
1911          * during mounts. For global heartbeat, the threshold doubled for the
1912          * first region.
1913          */
1914         live_threshold = O2HB_LIVE_THRESHOLD;
1915         if (o2hb_global_heartbeat_active()) {
1916                 spin_lock(&o2hb_live_lock);
1917                 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1918                         live_threshold <<= 1;
1919                 spin_unlock(&o2hb_live_lock);
1920         }
1921         ++live_threshold;
1922         atomic_set(&reg->hr_steady_iterations, live_threshold);
1923         /* unsteady_iterations is triple the steady_iterations */
1924         atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1925 
1926         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1927                               reg->hr_item.ci_name);
1928         if (IS_ERR(hb_task)) {
1929                 ret = PTR_ERR(hb_task);
1930                 mlog_errno(ret);
1931                 goto out3;
1932         }
1933 
1934         spin_lock(&o2hb_live_lock);
1935         reg->hr_task = hb_task;
1936         spin_unlock(&o2hb_live_lock);
1937 
1938         ret = wait_event_interruptible(o2hb_steady_queue,
1939                                 atomic_read(&reg->hr_steady_iterations) == 0 ||
1940                                 reg->hr_node_deleted);
1941         if (ret) {
1942                 atomic_set(&reg->hr_steady_iterations, 0);
1943                 reg->hr_aborted_start = 1;
1944         }
1945 
1946         if (reg->hr_aborted_start) {
1947                 ret = -EIO;
1948                 goto out3;
1949         }
1950 
1951         if (reg->hr_node_deleted) {
1952                 ret = -EINVAL;
1953                 goto out3;
1954         }
1955 
1956         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1957         spin_lock(&o2hb_live_lock);
1958         hb_task = reg->hr_task;
1959         if (o2hb_global_heartbeat_active())
1960                 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1961         spin_unlock(&o2hb_live_lock);
1962 
1963         if (hb_task)
1964                 ret = count;
1965         else
1966                 ret = -EIO;
1967 
1968         if (hb_task && o2hb_global_heartbeat_active())
1969                 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1970                        config_item_name(&reg->hr_item), reg->hr_dev_name);
1971 
1972 out3:
1973         iput(inode);
1974 out2:
1975         fdput(f);
1976 out:
1977         if (ret < 0) {
1978                 if (reg->hr_bdev) {
1979                         blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1980                         reg->hr_bdev = NULL;
1981                 }
1982         }
1983         return ret;
1984 }
1985 
1986 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1987 {
1988         struct o2hb_region *reg = to_o2hb_region(item);
1989         pid_t pid = 0;
1990 
1991         spin_lock(&o2hb_live_lock);
1992         if (reg->hr_task)
1993                 pid = task_pid_nr(reg->hr_task);
1994         spin_unlock(&o2hb_live_lock);
1995 
1996         if (!pid)
1997                 return 0;
1998 
1999         return sprintf(page, "%u\n", pid);
2000 }
2001 
2002 CONFIGFS_ATTR(o2hb_region_, block_bytes);
2003 CONFIGFS_ATTR(o2hb_region_, start_block);
2004 CONFIGFS_ATTR(o2hb_region_, blocks);
2005 CONFIGFS_ATTR(o2hb_region_, dev);
2006 CONFIGFS_ATTR_RO(o2hb_region_, pid);
2007 
2008 static struct configfs_attribute *o2hb_region_attrs[] = {
2009         &o2hb_region_attr_block_bytes,
2010         &o2hb_region_attr_start_block,
2011         &o2hb_region_attr_blocks,
2012         &o2hb_region_attr_dev,
2013         &o2hb_region_attr_pid,
2014         NULL,
2015 };
2016 
2017 static struct configfs_item_operations o2hb_region_item_ops = {
2018         .release                = o2hb_region_release,
2019 };
2020 
2021 static const struct config_item_type o2hb_region_type = {
2022         .ct_item_ops    = &o2hb_region_item_ops,
2023         .ct_attrs       = o2hb_region_attrs,
2024         .ct_owner       = THIS_MODULE,
2025 };
2026 
2027 /* heartbeat set */
2028 
2029 struct o2hb_heartbeat_group {
2030         struct config_group hs_group;
2031         /* some stuff? */
2032 };
2033 
2034 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
2035 {
2036         return group ?
2037                 container_of(group, struct o2hb_heartbeat_group, hs_group)
2038                 : NULL;
2039 }
2040 
2041 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
2042 {
2043         int ret = -ENOMEM;
2044 
2045         reg->hr_debug_dir =
2046                 debugfs_create_dir(config_item_name(&reg->hr_item), dir);
2047         if (!reg->hr_debug_dir) {
2048                 mlog_errno(ret);
2049                 goto bail;
2050         }
2051 
2052         reg->hr_debug_livenodes =
2053                         o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2054                                           reg->hr_debug_dir,
2055                                           &(reg->hr_db_livenodes),
2056                                           sizeof(*(reg->hr_db_livenodes)),
2057                                           O2HB_DB_TYPE_REGION_LIVENODES,
2058                                           sizeof(reg->hr_live_node_bitmap),
2059                                           O2NM_MAX_NODES, reg);
2060         if (!reg->hr_debug_livenodes) {
2061                 mlog_errno(ret);
2062                 goto bail;
2063         }
2064 
2065         reg->hr_debug_regnum =
2066                         o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2067                                           reg->hr_debug_dir,
2068                                           &(reg->hr_db_regnum),
2069                                           sizeof(*(reg->hr_db_regnum)),
2070                                           O2HB_DB_TYPE_REGION_NUMBER,
2071                                           0, O2NM_MAX_NODES, reg);
2072         if (!reg->hr_debug_regnum) {
2073                 mlog_errno(ret);
2074                 goto bail;
2075         }
2076 
2077         reg->hr_debug_elapsed_time =
2078                         o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2079                                           reg->hr_debug_dir,
2080                                           &(reg->hr_db_elapsed_time),
2081                                           sizeof(*(reg->hr_db_elapsed_time)),
2082                                           O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2083                                           0, 0, reg);
2084         if (!reg->hr_debug_elapsed_time) {
2085                 mlog_errno(ret);
2086                 goto bail;
2087         }
2088 
2089         reg->hr_debug_pinned =
2090                         o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2091                                           reg->hr_debug_dir,
2092                                           &(reg->hr_db_pinned),
2093                                           sizeof(*(reg->hr_db_pinned)),
2094                                           O2HB_DB_TYPE_REGION_PINNED,
2095                                           0, 0, reg);
2096         if (!reg->hr_debug_pinned) {
2097                 mlog_errno(ret);
2098                 goto bail;
2099         }
2100 
2101         ret = 0;
2102 bail:
2103         return ret;
2104 }
2105 
2106 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2107                                                           const char *name)
2108 {
2109         struct o2hb_region *reg = NULL;
2110         int ret;
2111 
2112         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2113         if (reg == NULL)
2114                 return ERR_PTR(-ENOMEM);
2115 
2116         if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2117                 ret = -ENAMETOOLONG;
2118                 goto free;
2119         }
2120 
2121         spin_lock(&o2hb_live_lock);
2122         reg->hr_region_num = 0;
2123         if (o2hb_global_heartbeat_active()) {
2124                 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2125                                                          O2NM_MAX_REGIONS);
2126                 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2127                         spin_unlock(&o2hb_live_lock);
2128                         ret = -EFBIG;
2129                         goto free;
2130                 }
2131                 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2132         }
2133         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2134         spin_unlock(&o2hb_live_lock);
2135 
2136         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2137 
2138         /* this is the same way to generate msg key as dlm, for local heartbeat,
2139          * name is also the same, so make initial crc value different to avoid
2140          * message key conflict.
2141          */
2142         reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2143                 name, strlen(name));
2144         INIT_LIST_HEAD(&reg->hr_handler_list);
2145         ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2146                         sizeof(struct o2hb_nego_msg),
2147                         o2hb_nego_timeout_handler,
2148                         reg, NULL, &reg->hr_handler_list);
2149         if (ret)
2150                 goto free;
2151 
2152         ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2153                         sizeof(struct o2hb_nego_msg),
2154                         o2hb_nego_approve_handler,
2155                         reg, NULL, &reg->hr_handler_list);
2156         if (ret)
2157                 goto unregister_handler;
2158 
2159         ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2160         if (ret) {
2161                 config_item_put(&reg->hr_item);
2162                 goto unregister_handler;
2163         }
2164 
2165         return &reg->hr_item;
2166 
2167 unregister_handler:
2168         o2net_unregister_handler_list(&reg->hr_handler_list);
2169 free:
2170         kfree(reg);
2171         return ERR_PTR(ret);
2172 }
2173 
2174 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2175                                            struct config_item *item)
2176 {
2177         struct task_struct *hb_task;
2178         struct o2hb_region *reg = to_o2hb_region(item);
2179         int quorum_region = 0;
2180 
2181         /* stop the thread when the user removes the region dir */
2182         spin_lock(&o2hb_live_lock);
2183         hb_task = reg->hr_task;
2184         reg->hr_task = NULL;
2185         reg->hr_item_dropped = 1;
2186         spin_unlock(&o2hb_live_lock);
2187 
2188         if (hb_task)
2189                 kthread_stop(hb_task);
2190 
2191         if (o2hb_global_heartbeat_active()) {
2192                 spin_lock(&o2hb_live_lock);
2193                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2194                 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2195                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2196                         quorum_region = 1;
2197                 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2198                 spin_unlock(&o2hb_live_lock);
2199                 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2200                        ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2201                         "stopped" : "start aborted"), config_item_name(item),
2202                        reg->hr_dev_name);
2203         }
2204 
2205         /*
2206          * If we're racing a dev_write(), we need to wake them.  They will
2207          * check reg->hr_task
2208          */
2209         if (atomic_read(&reg->hr_steady_iterations) != 0) {
2210                 reg->hr_aborted_start = 1;
2211                 atomic_set(&reg->hr_steady_iterations, 0);
2212                 wake_up(&o2hb_steady_queue);
2213         }
2214 
2215         config_item_put(item);
2216 
2217         if (!o2hb_global_heartbeat_active() || !quorum_region)
2218                 return;
2219 
2220         /*
2221          * If global heartbeat active and there are dependent users,
2222          * pin all regions if quorum region count <= CUT_OFF
2223          */
2224         spin_lock(&o2hb_live_lock);
2225 
2226         if (!o2hb_dependent_users)
2227                 goto unlock;
2228 
2229         if (bitmap_weight(o2hb_quorum_region_bitmap,
2230                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2231                 o2hb_region_pin(NULL);
2232 
2233 unlock:
2234         spin_unlock(&o2hb_live_lock);
2235 }
2236 
2237 static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item,
2238                 char *page)
2239 {
2240         return sprintf(page, "%u\n", o2hb_dead_threshold);
2241 }
2242 
2243 static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item,
2244                 const char *page, size_t count)
2245 {
2246         unsigned long tmp;
2247         char *p = (char *)page;
2248 
2249         tmp = simple_strtoul(p, &p, 10);
2250         if (!p || (*p && (*p != '\n')))
2251                 return -EINVAL;
2252 
2253         /* this will validate ranges for us. */
2254         o2hb_dead_threshold_set((unsigned int) tmp);
2255 
2256         return count;
2257 }
2258 
2259 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2260                 char *page)
2261 {
2262         return sprintf(page, "%s\n",
2263                        o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2264 }
2265 
2266 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2267                 const char *page, size_t count)
2268 {
2269         unsigned int i;
2270         int ret;
2271         size_t len;
2272 
2273         len = (page[count - 1] == '\n') ? count - 1 : count;
2274         if (!len)
2275                 return -EINVAL;
2276 
2277         for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2278                 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2279                         continue;
2280 
2281                 ret = o2hb_global_heartbeat_mode_set(i);
2282                 if (!ret)
2283                         printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2284                                o2hb_heartbeat_mode_desc[i]);
2285                 return count;
2286         }
2287 
2288         return -EINVAL;
2289 
2290 }
2291 
2292 CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold);
2293 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2294 
2295 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2296         &o2hb_heartbeat_group_attr_dead_threshold,
2297         &o2hb_heartbeat_group_attr_mode,
2298         NULL,
2299 };
2300 
2301 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2302         .make_item      = o2hb_heartbeat_group_make_item,
2303         .drop_item      = o2hb_heartbeat_group_drop_item,
2304 };
2305 
2306 static const struct config_item_type o2hb_heartbeat_group_type = {
2307         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2308         .ct_attrs       = o2hb_heartbeat_group_attrs,
2309         .ct_owner       = THIS_MODULE,
2310 };
2311 
2312 /* this is just here to avoid touching group in heartbeat.h which the
2313  * entire damn world #includes */
2314 struct config_group *o2hb_alloc_hb_set(void)
2315 {
2316         struct o2hb_heartbeat_group *hs = NULL;
2317         struct config_group *ret = NULL;
2318 
2319         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2320         if (hs == NULL)
2321                 goto out;
2322 
2323         config_group_init_type_name(&hs->hs_group, "heartbeat",
2324                                     &o2hb_heartbeat_group_type);
2325 
2326         ret = &hs->hs_group;
2327 out:
2328         if (ret == NULL)
2329                 kfree(hs);
2330         return ret;
2331 }
2332 
2333 void o2hb_free_hb_set(struct config_group *group)
2334 {
2335         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2336         kfree(hs);
2337 }
2338 
2339 /* hb callback registration and issuing */
2340 
2341 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2342 {
2343         if (type == O2HB_NUM_CB)
2344                 return ERR_PTR(-EINVAL);
2345 
2346         return &o2hb_callbacks[type];
2347 }
2348 
2349 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2350                          enum o2hb_callback_type type,
2351                          o2hb_cb_func *func,
2352                          void *data,
2353                          int priority)
2354 {
2355         INIT_LIST_HEAD(&hc->hc_item);
2356         hc->hc_func = func;
2357         hc->hc_data = data;
2358         hc->hc_priority = priority;
2359         hc->hc_type = type;
2360         hc->hc_magic = O2HB_CB_MAGIC;
2361 }
2362 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2363 
2364 /*
2365  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2366  * In global heartbeat mode, region_uuid passed is NULL.
2367  *
2368  * In local, we only pin the matching region. In global we pin all the active
2369  * regions.
2370  */
2371 static int o2hb_region_pin(const char *region_uuid)
2372 {
2373         int ret = 0, found = 0;
2374         struct o2hb_region *reg;
2375         char *uuid;
2376 
2377         assert_spin_locked(&o2hb_live_lock);
2378 
2379         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2380                 if (reg->hr_item_dropped)
2381                         continue;
2382 
2383                 uuid = config_item_name(&reg->hr_item);
2384 
2385                 /* local heartbeat */
2386                 if (region_uuid) {
2387                         if (strcmp(region_uuid, uuid))
2388                                 continue;
2389                         found = 1;
2390                 }
2391 
2392                 if (reg->hr_item_pinned || reg->hr_item_dropped)
2393                         goto skip_pin;
2394 
2395                 /* Ignore ENOENT only for local hb (userdlm domain) */
2396                 ret = o2nm_depend_item(&reg->hr_item);
2397                 if (!ret) {
2398                         mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2399                         reg->hr_item_pinned = 1;
2400                 } else {
2401                         if (ret == -ENOENT && found)
2402                                 ret = 0;
2403                         else {
2404                                 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2405                                      uuid, ret);
2406                                 break;
2407                         }
2408                 }
2409 skip_pin:
2410                 if (found)
2411                         break;
2412         }
2413 
2414         return ret;
2415 }
2416 
2417 /*
2418  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2419  * In global heartbeat mode, region_uuid passed is NULL.
2420  *
2421  * In local, we only unpin the matching region. In global we unpin all the
2422  * active regions.
2423  */
2424 static void o2hb_region_unpin(const char *region_uuid)
2425 {
2426         struct o2hb_region *reg;
2427         char *uuid;
2428         int found = 0;
2429 
2430         assert_spin_locked(&o2hb_live_lock);
2431 
2432         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2433                 if (reg->hr_item_dropped)
2434                         continue;
2435 
2436                 uuid = config_item_name(&reg->hr_item);
2437                 if (region_uuid) {
2438                         if (strcmp(region_uuid, uuid))
2439                                 continue;
2440                         found = 1;
2441                 }
2442 
2443                 if (reg->hr_item_pinned) {
2444                         mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2445                         o2nm_undepend_item(&reg->hr_item);
2446                         reg->hr_item_pinned = 0;
2447                 }
2448                 if (found)
2449                         break;
2450         }
2451 }
2452 
2453 static int o2hb_region_inc_user(const char *region_uuid)
2454 {
2455         int ret = 0;
2456 
2457         spin_lock(&o2hb_live_lock);
2458 
2459         /* local heartbeat */
2460         if (!o2hb_global_heartbeat_active()) {
2461             ret = o2hb_region_pin(region_uuid);
2462             goto unlock;
2463         }
2464 
2465         /*
2466          * if global heartbeat active and this is the first dependent user,
2467          * pin all regions if quorum region count <= CUT_OFF
2468          */
2469         o2hb_dependent_users++;
2470         if (o2hb_dependent_users > 1)
2471                 goto unlock;
2472 
2473         if (bitmap_weight(o2hb_quorum_region_bitmap,
2474                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2475                 ret = o2hb_region_pin(NULL);
2476 
2477 unlock:
2478         spin_unlock(&o2hb_live_lock);
2479         return ret;
2480 }
2481 
2482 static void o2hb_region_dec_user(const char *region_uuid)
2483 {
2484         spin_lock(&o2hb_live_lock);
2485 
2486         /* local heartbeat */
2487         if (!o2hb_global_heartbeat_active()) {
2488             o2hb_region_unpin(region_uuid);
2489             goto unlock;
2490         }
2491 
2492         /*
2493          * if global heartbeat active and there are no dependent users,
2494          * unpin all quorum regions
2495          */
2496         o2hb_dependent_users--;
2497         if (!o2hb_dependent_users)
2498                 o2hb_region_unpin(NULL);
2499 
2500 unlock:
2501         spin_unlock(&o2hb_live_lock);
2502 }
2503 
2504 int o2hb_register_callback(const char *region_uuid,
2505                            struct o2hb_callback_func *hc)
2506 {
2507         struct o2hb_callback_func *f;
2508         struct o2hb_callback *hbcall;
2509         int ret;
2510 
2511         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2512         BUG_ON(!list_empty(&hc->hc_item));
2513 
2514         hbcall = hbcall_from_type(hc->hc_type);
2515         if (IS_ERR(hbcall)) {
2516                 ret = PTR_ERR(hbcall);
2517                 goto out;
2518         }
2519 
2520         if (region_uuid) {
2521                 ret = o2hb_region_inc_user(region_uuid);
2522                 if (ret) {
2523                         mlog_errno(ret);
2524                         goto out;
2525                 }
2526         }
2527 
2528         down_write(&o2hb_callback_sem);
2529 
2530         list_for_each_entry(f, &hbcall->list, hc_item) {
2531                 if (hc->hc_priority < f->hc_priority) {
2532                         list_add_tail(&hc->hc_item, &f->hc_item);
2533                         break;
2534                 }
2535         }
2536         if (list_empty(&hc->hc_item))
2537                 list_add_tail(&hc->hc_item, &hbcall->list);
2538 
2539         up_write(&o2hb_callback_sem);
2540         ret = 0;
2541 out:
2542         mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2543              ret, __builtin_return_address(0), hc);
2544         return ret;
2545 }
2546 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2547 
2548 void o2hb_unregister_callback(const char *region_uuid,
2549                               struct o2hb_callback_func *hc)
2550 {
2551         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2552 
2553         mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2554              __builtin_return_address(0), hc);
2555 
2556         /* XXX Can this happen _with_ a region reference? */
2557         if (list_empty(&hc->hc_item))
2558                 return;
2559 
2560         if (region_uuid)
2561                 o2hb_region_dec_user(region_uuid);
2562 
2563         down_write(&o2hb_callback_sem);
2564 
2565         list_del_init(&hc->hc_item);
2566 
2567         up_write(&o2hb_callback_sem);
2568 }
2569 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2570 
2571 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2572 {
2573         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2574 
2575         spin_lock(&o2hb_live_lock);
2576         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2577         spin_unlock(&o2hb_live_lock);
2578         if (!test_bit(node_num, testing_map)) {
2579                 mlog(ML_HEARTBEAT,
2580                      "node (%u) does not have heartbeating enabled.\n",
2581                      node_num);
2582                 return 0;
2583         }
2584 
2585         return 1;
2586 }
2587 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2588 
2589 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2590 {
2591         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2592 
2593         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2594         if (!test_bit(node_num, testing_map)) {
2595                 mlog(ML_HEARTBEAT,
2596                      "node (%u) does not have heartbeating enabled.\n",
2597                      node_num);
2598                 return 0;
2599         }
2600 
2601         return 1;
2602 }
2603 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2604 
2605 /*
2606  * this is just a hack until we get the plumbing which flips file systems
2607  * read only and drops the hb ref instead of killing the node dead.
2608  */
2609 void o2hb_stop_all_regions(void)
2610 {
2611         struct o2hb_region *reg;
2612 
2613         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2614 
2615         spin_lock(&o2hb_live_lock);
2616 
2617         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2618                 reg->hr_unclean_stop = 1;
2619 
2620         spin_unlock(&o2hb_live_lock);
2621 }
2622 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2623 
2624 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2625 {
2626         struct o2hb_region *reg;
2627         int numregs = 0;
2628         char *p;
2629 
2630         spin_lock(&o2hb_live_lock);
2631 
2632         p = region_uuids;
2633         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2634                 if (reg->hr_item_dropped)
2635                         continue;
2636 
2637                 mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2638                 if (numregs < max_regions) {
2639                         memcpy(p, config_item_name(&reg->hr_item),
2640                                O2HB_MAX_REGION_NAME_LEN);
2641                         p += O2HB_MAX_REGION_NAME_LEN;
2642                 }
2643                 numregs++;
2644         }
2645 
2646         spin_unlock(&o2hb_live_lock);
2647 
2648         return numregs;
2649 }
2650 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2651 
2652 int o2hb_global_heartbeat_active(void)
2653 {
2654         return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2655 }
2656 EXPORT_SYMBOL(o2hb_global_heartbeat_active);
2657 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp