~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/net/ceph/mon_client.c

Version: ~ [ linux-5.3 ] ~ [ linux-5.2.15 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.73 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.144 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.193 ] ~ [ linux-4.8.17 ] ~ [ linux-4.7.10 ] ~ [ linux-4.6.7 ] ~ [ linux-4.5.7 ] ~ [ linux-4.4.193 ] ~ [ linux-4.3.6 ] ~ [ linux-4.2.8 ] ~ [ linux-4.1.52 ] ~ [ linux-4.0.9 ] ~ [ linux-3.19.8 ] ~ [ linux-3.18.140 ] ~ [ linux-3.17.8 ] ~ [ linux-3.16.73 ] ~ [ linux-3.15.10 ] ~ [ linux-3.14.79 ] ~ [ linux-3.13.11 ] ~ [ linux-3.12.74 ] ~ [ linux-3.11.10 ] ~ [ linux-3.10.108 ] ~ [ linux-3.9.11 ] ~ [ linux-3.8.13 ] ~ [ linux-3.7.10 ] ~ [ linux-3.6.11 ] ~ [ linux-3.5.7 ] ~ [ linux-3.4.113 ] ~ [ linux-3.3.8 ] ~ [ linux-3.2.102 ] ~ [ linux-3.1.10 ] ~ [ linux-3.0.101 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.5 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 #include <linux/ceph/ceph_debug.h>
  2 
  3 #include <linux/module.h>
  4 #include <linux/types.h>
  5 #include <linux/slab.h>
  6 #include <linux/random.h>
  7 #include <linux/sched.h>
  8 
  9 #include <linux/ceph/mon_client.h>
 10 #include <linux/ceph/libceph.h>
 11 #include <linux/ceph/debugfs.h>
 12 #include <linux/ceph/decode.h>
 13 #include <linux/ceph/auth.h>
 14 
 15 /*
 16  * Interact with Ceph monitor cluster.  Handle requests for new map
 17  * versions, and periodically resend as needed.  Also implement
 18  * statfs() and umount().
 19  *
 20  * A small cluster of Ceph "monitors" are responsible for managing critical
 21  * cluster configuration and state information.  An odd number (e.g., 3, 5)
 22  * of cmon daemons use a modified version of the Paxos part-time parliament
 23  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
 24  * list of clients who have mounted the file system.
 25  *
 26  * We maintain an open, active session with a monitor at all times in order to
 27  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
 28  * TCP socket to ensure we detect a failure.  If the connection does break, we
 29  * randomly hunt for a new monitor.  Once the connection is reestablished, we
 30  * resend any outstanding requests.
 31  */
 32 
 33 static const struct ceph_connection_operations mon_con_ops;
 34 
 35 static int __validate_auth(struct ceph_mon_client *monc);
 36 
 37 /*
 38  * Decode a monmap blob (e.g., during mount).
 39  */
 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
 41 {
 42         struct ceph_monmap *m = NULL;
 43         int i, err = -EINVAL;
 44         struct ceph_fsid fsid;
 45         u32 epoch, num_mon;
 46         u16 version;
 47         u32 len;
 48 
 49         ceph_decode_32_safe(&p, end, len, bad);
 50         ceph_decode_need(&p, end, len, bad);
 51 
 52         dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
 53 
 54         ceph_decode_16_safe(&p, end, version, bad);
 55 
 56         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
 57         ceph_decode_copy(&p, &fsid, sizeof(fsid));
 58         epoch = ceph_decode_32(&p);
 59 
 60         num_mon = ceph_decode_32(&p);
 61         ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
 62 
 63         if (num_mon >= CEPH_MAX_MON)
 64                 goto bad;
 65         m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
 66         if (m == NULL)
 67                 return ERR_PTR(-ENOMEM);
 68         m->fsid = fsid;
 69         m->epoch = epoch;
 70         m->num_mon = num_mon;
 71         ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
 72         for (i = 0; i < num_mon; i++)
 73                 ceph_decode_addr(&m->mon_inst[i].addr);
 74 
 75         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
 76              m->num_mon);
 77         for (i = 0; i < m->num_mon; i++)
 78                 dout("monmap_decode  mon%d is %s\n", i,
 79                      ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
 80         return m;
 81 
 82 bad:
 83         dout("monmap_decode failed with %d\n", err);
 84         kfree(m);
 85         return ERR_PTR(err);
 86 }
 87 
 88 /*
 89  * return true if *addr is included in the monmap.
 90  */
 91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
 92 {
 93         int i;
 94 
 95         for (i = 0; i < m->num_mon; i++)
 96                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
 97                         return 1;
 98         return 0;
 99 }
100 
101 /*
102  * Send an auth request.
103  */
104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
105 {
106         monc->pending_auth = 1;
107         monc->m_auth->front.iov_len = len;
108         monc->m_auth->hdr.front_len = cpu_to_le32(len);
109         ceph_msg_revoke(monc->m_auth);
110         ceph_msg_get(monc->m_auth);  /* keep our ref */
111         ceph_con_send(&monc->con, monc->m_auth);
112 }
113 
114 /*
115  * Close monitor session, if any.
116  */
117 static void __close_session(struct ceph_mon_client *monc)
118 {
119         dout("__close_session closing mon%d\n", monc->cur_mon);
120         ceph_msg_revoke(monc->m_auth);
121         ceph_msg_revoke_incoming(monc->m_auth_reply);
122         ceph_msg_revoke(monc->m_subscribe);
123         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
124         ceph_con_close(&monc->con);
125         monc->cur_mon = -1;
126         monc->pending_auth = 0;
127         ceph_auth_reset(monc->auth);
128 }
129 
130 /*
131  * Open a session with a (new) monitor.
132  */
133 static int __open_session(struct ceph_mon_client *monc)
134 {
135         char r;
136         int ret;
137 
138         if (monc->cur_mon < 0) {
139                 get_random_bytes(&r, 1);
140                 monc->cur_mon = r % monc->monmap->num_mon;
141                 dout("open_session num=%d r=%d -> mon%d\n",
142                      monc->monmap->num_mon, r, monc->cur_mon);
143                 monc->sub_sent = 0;
144                 monc->sub_renew_after = jiffies;  /* i.e., expired */
145                 monc->want_next_osdmap = !!monc->want_next_osdmap;
146 
147                 dout("open_session mon%d opening\n", monc->cur_mon);
148                 ceph_con_open(&monc->con,
149                               CEPH_ENTITY_TYPE_MON, monc->cur_mon,
150                               &monc->monmap->mon_inst[monc->cur_mon].addr);
151 
152                 /* send an initial keepalive to ensure our timestamp is
153                  * valid by the time we are in an OPENED state */
154                 ceph_con_keepalive(&monc->con);
155 
156                 /* initiatiate authentication handshake */
157                 ret = ceph_auth_build_hello(monc->auth,
158                                             monc->m_auth->front.iov_base,
159                                             monc->m_auth->front_alloc_len);
160                 __send_prepared_auth_request(monc, ret);
161         } else {
162                 dout("open_session mon%d already open\n", monc->cur_mon);
163         }
164         return 0;
165 }
166 
167 static bool __sub_expired(struct ceph_mon_client *monc)
168 {
169         return time_after_eq(jiffies, monc->sub_renew_after);
170 }
171 
172 /*
173  * Reschedule delayed work timer.
174  */
175 static void __schedule_delayed(struct ceph_mon_client *monc)
176 {
177         struct ceph_options *opt = monc->client->options;
178         unsigned long delay;
179 
180         if (monc->cur_mon < 0 || __sub_expired(monc)) {
181                 delay = 10 * HZ;
182         } else {
183                 delay = 20 * HZ;
184                 if (opt->monc_ping_timeout > 0)
185                         delay = min(delay, opt->monc_ping_timeout / 3);
186         }
187         dout("__schedule_delayed after %lu\n", delay);
188         schedule_delayed_work(&monc->delayed_work,
189                               round_jiffies_relative(delay));
190 }
191 
192 /*
193  * Send subscribe request for mdsmap and/or osdmap.
194  */
195 static void __send_subscribe(struct ceph_mon_client *monc)
196 {
197         dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
198              (unsigned int)monc->sub_sent, __sub_expired(monc),
199              monc->want_next_osdmap);
200         if ((__sub_expired(monc) && !monc->sub_sent) ||
201             monc->want_next_osdmap == 1) {
202                 struct ceph_msg *msg = monc->m_subscribe;
203                 struct ceph_mon_subscribe_item *i;
204                 void *p, *end;
205                 int num;
206 
207                 p = msg->front.iov_base;
208                 end = p + msg->front_alloc_len;
209 
210                 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
211                 ceph_encode_32(&p, num);
212 
213                 if (monc->want_next_osdmap) {
214                         dout("__send_subscribe to 'osdmap' %u\n",
215                              (unsigned int)monc->have_osdmap);
216                         ceph_encode_string(&p, end, "osdmap", 6);
217                         i = p;
218                         i->have = cpu_to_le64(monc->have_osdmap);
219                         i->onetime = 1;
220                         p += sizeof(*i);
221                         monc->want_next_osdmap = 2;  /* requested */
222                 }
223                 if (monc->want_mdsmap) {
224                         dout("__send_subscribe to 'mdsmap' %u+\n",
225                              (unsigned int)monc->have_mdsmap);
226                         ceph_encode_string(&p, end, "mdsmap", 6);
227                         i = p;
228                         i->have = cpu_to_le64(monc->have_mdsmap);
229                         i->onetime = 0;
230                         p += sizeof(*i);
231                 }
232                 ceph_encode_string(&p, end, "monmap", 6);
233                 i = p;
234                 i->have = 0;
235                 i->onetime = 0;
236                 p += sizeof(*i);
237 
238                 msg->front.iov_len = p - msg->front.iov_base;
239                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
240                 ceph_msg_revoke(msg);
241                 ceph_con_send(&monc->con, ceph_msg_get(msg));
242 
243                 monc->sub_sent = jiffies | 1;  /* never 0 */
244         }
245 }
246 
247 static void handle_subscribe_ack(struct ceph_mon_client *monc,
248                                  struct ceph_msg *msg)
249 {
250         unsigned int seconds;
251         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
252 
253         if (msg->front.iov_len < sizeof(*h))
254                 goto bad;
255         seconds = le32_to_cpu(h->duration);
256 
257         mutex_lock(&monc->mutex);
258         if (monc->hunting) {
259                 pr_info("mon%d %s session established\n",
260                         monc->cur_mon,
261                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
262                 monc->hunting = false;
263         }
264         dout("handle_subscribe_ack after %d seconds\n", seconds);
265         monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
266         monc->sub_sent = 0;
267         mutex_unlock(&monc->mutex);
268         return;
269 bad:
270         pr_err("got corrupt subscribe-ack msg\n");
271         ceph_msg_dump(msg);
272 }
273 
274 /*
275  * Keep track of which maps we have
276  */
277 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
278 {
279         mutex_lock(&monc->mutex);
280         monc->have_mdsmap = got;
281         mutex_unlock(&monc->mutex);
282         return 0;
283 }
284 EXPORT_SYMBOL(ceph_monc_got_mdsmap);
285 
286 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
287 {
288         mutex_lock(&monc->mutex);
289         monc->have_osdmap = got;
290         monc->want_next_osdmap = 0;
291         mutex_unlock(&monc->mutex);
292         return 0;
293 }
294 
295 /*
296  * Register interest in the next osdmap
297  */
298 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
299 {
300         dout("request_next_osdmap have %u\n", monc->have_osdmap);
301         mutex_lock(&monc->mutex);
302         if (!monc->want_next_osdmap)
303                 monc->want_next_osdmap = 1;
304         if (monc->want_next_osdmap < 2)
305                 __send_subscribe(monc);
306         mutex_unlock(&monc->mutex);
307 }
308 EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
309 
310 /*
311  * Wait for an osdmap with a given epoch.
312  *
313  * @epoch: epoch to wait for
314  * @timeout: in jiffies, 0 means "wait forever"
315  */
316 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
317                           unsigned long timeout)
318 {
319         unsigned long started = jiffies;
320         long ret;
321 
322         mutex_lock(&monc->mutex);
323         while (monc->have_osdmap < epoch) {
324                 mutex_unlock(&monc->mutex);
325 
326                 if (timeout && time_after_eq(jiffies, started + timeout))
327                         return -ETIMEDOUT;
328 
329                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
330                                                 monc->have_osdmap >= epoch,
331                                                 ceph_timeout_jiffies(timeout));
332                 if (ret < 0)
333                         return ret;
334 
335                 mutex_lock(&monc->mutex);
336         }
337 
338         mutex_unlock(&monc->mutex);
339         return 0;
340 }
341 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
342 
343 /*
344  *
345  */
346 int ceph_monc_open_session(struct ceph_mon_client *monc)
347 {
348         mutex_lock(&monc->mutex);
349         __open_session(monc);
350         __schedule_delayed(monc);
351         mutex_unlock(&monc->mutex);
352         return 0;
353 }
354 EXPORT_SYMBOL(ceph_monc_open_session);
355 
356 /*
357  * We require the fsid and global_id in order to initialize our
358  * debugfs dir.
359  */
360 static bool have_debugfs_info(struct ceph_mon_client *monc)
361 {
362         dout("have_debugfs_info fsid %d globalid %lld\n",
363              (int)monc->client->have_fsid, monc->auth->global_id);
364         return monc->client->have_fsid && monc->auth->global_id > 0;
365 }
366 
367 /*
368  * The monitor responds with mount ack indicate mount success.  The
369  * included client ticket allows the client to talk to MDSs and OSDs.
370  */
371 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
372                                  struct ceph_msg *msg)
373 {
374         struct ceph_client *client = monc->client;
375         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
376         void *p, *end;
377         int had_debugfs_info, init_debugfs = 0;
378 
379         mutex_lock(&monc->mutex);
380 
381         had_debugfs_info = have_debugfs_info(monc);
382 
383         dout("handle_monmap\n");
384         p = msg->front.iov_base;
385         end = p + msg->front.iov_len;
386 
387         monmap = ceph_monmap_decode(p, end);
388         if (IS_ERR(monmap)) {
389                 pr_err("problem decoding monmap, %d\n",
390                        (int)PTR_ERR(monmap));
391                 goto out;
392         }
393 
394         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
395                 kfree(monmap);
396                 goto out;
397         }
398 
399         client->monc.monmap = monmap;
400         kfree(old);
401 
402         if (!client->have_fsid) {
403                 client->have_fsid = true;
404                 if (!had_debugfs_info && have_debugfs_info(monc)) {
405                         pr_info("client%lld fsid %pU\n",
406                                 ceph_client_id(monc->client),
407                                 &monc->client->fsid);
408                         init_debugfs = 1;
409                 }
410                 mutex_unlock(&monc->mutex);
411 
412                 if (init_debugfs) {
413                         /*
414                          * do debugfs initialization without mutex to avoid
415                          * creating a locking dependency
416                          */
417                         ceph_debugfs_client_init(monc->client);
418                 }
419 
420                 goto out_unlocked;
421         }
422 out:
423         mutex_unlock(&monc->mutex);
424 out_unlocked:
425         wake_up_all(&client->auth_wq);
426 }
427 
428 /*
429  * generic requests (currently statfs, mon_get_version)
430  */
431 static struct ceph_mon_generic_request *__lookup_generic_req(
432         struct ceph_mon_client *monc, u64 tid)
433 {
434         struct ceph_mon_generic_request *req;
435         struct rb_node *n = monc->generic_request_tree.rb_node;
436 
437         while (n) {
438                 req = rb_entry(n, struct ceph_mon_generic_request, node);
439                 if (tid < req->tid)
440                         n = n->rb_left;
441                 else if (tid > req->tid)
442                         n = n->rb_right;
443                 else
444                         return req;
445         }
446         return NULL;
447 }
448 
449 static void __insert_generic_request(struct ceph_mon_client *monc,
450                             struct ceph_mon_generic_request *new)
451 {
452         struct rb_node **p = &monc->generic_request_tree.rb_node;
453         struct rb_node *parent = NULL;
454         struct ceph_mon_generic_request *req = NULL;
455 
456         while (*p) {
457                 parent = *p;
458                 req = rb_entry(parent, struct ceph_mon_generic_request, node);
459                 if (new->tid < req->tid)
460                         p = &(*p)->rb_left;
461                 else if (new->tid > req->tid)
462                         p = &(*p)->rb_right;
463                 else
464                         BUG();
465         }
466 
467         rb_link_node(&new->node, parent, p);
468         rb_insert_color(&new->node, &monc->generic_request_tree);
469 }
470 
471 static void release_generic_request(struct kref *kref)
472 {
473         struct ceph_mon_generic_request *req =
474                 container_of(kref, struct ceph_mon_generic_request, kref);
475 
476         if (req->reply)
477                 ceph_msg_put(req->reply);
478         if (req->request)
479                 ceph_msg_put(req->request);
480 
481         kfree(req);
482 }
483 
484 static void put_generic_request(struct ceph_mon_generic_request *req)
485 {
486         kref_put(&req->kref, release_generic_request);
487 }
488 
489 static void get_generic_request(struct ceph_mon_generic_request *req)
490 {
491         kref_get(&req->kref);
492 }
493 
494 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
495                                          struct ceph_msg_header *hdr,
496                                          int *skip)
497 {
498         struct ceph_mon_client *monc = con->private;
499         struct ceph_mon_generic_request *req;
500         u64 tid = le64_to_cpu(hdr->tid);
501         struct ceph_msg *m;
502 
503         mutex_lock(&monc->mutex);
504         req = __lookup_generic_req(monc, tid);
505         if (!req) {
506                 dout("get_generic_reply %lld dne\n", tid);
507                 *skip = 1;
508                 m = NULL;
509         } else {
510                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
511                 *skip = 0;
512                 m = ceph_msg_get(req->reply);
513                 /*
514                  * we don't need to track the connection reading into
515                  * this reply because we only have one open connection
516                  * at a time, ever.
517                  */
518         }
519         mutex_unlock(&monc->mutex);
520         return m;
521 }
522 
523 static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
524                                 struct ceph_mon_generic_request *req)
525 {
526         int err;
527 
528         /* register request */
529         req->tid = tid != 0 ? tid : ++monc->last_tid;
530         req->request->hdr.tid = cpu_to_le64(req->tid);
531         __insert_generic_request(monc, req);
532         monc->num_generic_requests++;
533         ceph_con_send(&monc->con, ceph_msg_get(req->request));
534         mutex_unlock(&monc->mutex);
535 
536         err = wait_for_completion_interruptible(&req->completion);
537 
538         mutex_lock(&monc->mutex);
539         rb_erase(&req->node, &monc->generic_request_tree);
540         monc->num_generic_requests--;
541 
542         if (!err)
543                 err = req->result;
544         return err;
545 }
546 
547 static int do_generic_request(struct ceph_mon_client *monc,
548                               struct ceph_mon_generic_request *req)
549 {
550         int err;
551 
552         mutex_lock(&monc->mutex);
553         err = __do_generic_request(monc, 0, req);
554         mutex_unlock(&monc->mutex);
555 
556         return err;
557 }
558 
559 /*
560  * statfs
561  */
562 static void handle_statfs_reply(struct ceph_mon_client *monc,
563                                 struct ceph_msg *msg)
564 {
565         struct ceph_mon_generic_request *req;
566         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
567         u64 tid = le64_to_cpu(msg->hdr.tid);
568 
569         if (msg->front.iov_len != sizeof(*reply))
570                 goto bad;
571         dout("handle_statfs_reply %p tid %llu\n", msg, tid);
572 
573         mutex_lock(&monc->mutex);
574         req = __lookup_generic_req(monc, tid);
575         if (req) {
576                 *(struct ceph_statfs *)req->buf = reply->st;
577                 req->result = 0;
578                 get_generic_request(req);
579         }
580         mutex_unlock(&monc->mutex);
581         if (req) {
582                 complete_all(&req->completion);
583                 put_generic_request(req);
584         }
585         return;
586 
587 bad:
588         pr_err("corrupt statfs reply, tid %llu\n", tid);
589         ceph_msg_dump(msg);
590 }
591 
592 /*
593  * Do a synchronous statfs().
594  */
595 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
596 {
597         struct ceph_mon_generic_request *req;
598         struct ceph_mon_statfs *h;
599         int err;
600 
601         req = kzalloc(sizeof(*req), GFP_NOFS);
602         if (!req)
603                 return -ENOMEM;
604 
605         kref_init(&req->kref);
606         req->buf = buf;
607         init_completion(&req->completion);
608 
609         err = -ENOMEM;
610         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
611                                     true);
612         if (!req->request)
613                 goto out;
614         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
615                                   true);
616         if (!req->reply)
617                 goto out;
618 
619         /* fill out request */
620         h = req->request->front.iov_base;
621         h->monhdr.have_version = 0;
622         h->monhdr.session_mon = cpu_to_le16(-1);
623         h->monhdr.session_mon_tid = 0;
624         h->fsid = monc->monmap->fsid;
625 
626         err = do_generic_request(monc, req);
627 
628 out:
629         put_generic_request(req);
630         return err;
631 }
632 EXPORT_SYMBOL(ceph_monc_do_statfs);
633 
634 static void handle_get_version_reply(struct ceph_mon_client *monc,
635                                      struct ceph_msg *msg)
636 {
637         struct ceph_mon_generic_request *req;
638         u64 tid = le64_to_cpu(msg->hdr.tid);
639         void *p = msg->front.iov_base;
640         void *end = p + msg->front_alloc_len;
641         u64 handle;
642 
643         dout("%s %p tid %llu\n", __func__, msg, tid);
644 
645         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
646         handle = ceph_decode_64(&p);
647         if (tid != 0 && tid != handle)
648                 goto bad;
649 
650         mutex_lock(&monc->mutex);
651         req = __lookup_generic_req(monc, handle);
652         if (req) {
653                 *(u64 *)req->buf = ceph_decode_64(&p);
654                 req->result = 0;
655                 get_generic_request(req);
656         }
657         mutex_unlock(&monc->mutex);
658         if (req) {
659                 complete_all(&req->completion);
660                 put_generic_request(req);
661         }
662 
663         return;
664 bad:
665         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
666         ceph_msg_dump(msg);
667 }
668 
669 /*
670  * Send MMonGetVersion and wait for the reply.
671  *
672  * @what: one of "mdsmap", "osdmap" or "monmap"
673  */
674 int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
675                              u64 *newest)
676 {
677         struct ceph_mon_generic_request *req;
678         void *p, *end;
679         u64 tid;
680         int err;
681 
682         req = kzalloc(sizeof(*req), GFP_NOFS);
683         if (!req)
684                 return -ENOMEM;
685 
686         kref_init(&req->kref);
687         req->buf = newest;
688         init_completion(&req->completion);
689 
690         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
691                                     sizeof(u64) + sizeof(u32) + strlen(what),
692                                     GFP_NOFS, true);
693         if (!req->request) {
694                 err = -ENOMEM;
695                 goto out;
696         }
697 
698         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
699                                   GFP_NOFS, true);
700         if (!req->reply) {
701                 err = -ENOMEM;
702                 goto out;
703         }
704 
705         p = req->request->front.iov_base;
706         end = p + req->request->front_alloc_len;
707 
708         /* fill out request */
709         mutex_lock(&monc->mutex);
710         tid = ++monc->last_tid;
711         ceph_encode_64(&p, tid); /* handle */
712         ceph_encode_string(&p, end, what, strlen(what));
713 
714         err = __do_generic_request(monc, tid, req);
715 
716         mutex_unlock(&monc->mutex);
717 out:
718         put_generic_request(req);
719         return err;
720 }
721 EXPORT_SYMBOL(ceph_monc_do_get_version);
722 
723 /*
724  * Resend pending generic requests.
725  */
726 static void __resend_generic_request(struct ceph_mon_client *monc)
727 {
728         struct ceph_mon_generic_request *req;
729         struct rb_node *p;
730 
731         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
732                 req = rb_entry(p, struct ceph_mon_generic_request, node);
733                 ceph_msg_revoke(req->request);
734                 ceph_msg_revoke_incoming(req->reply);
735                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
736         }
737 }
738 
739 /*
740  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
741  * renew/retry subscription as needed (in case it is timing out, or we
742  * got an ENOMEM).  And keep the monitor connection alive.
743  */
744 static void delayed_work(struct work_struct *work)
745 {
746         struct ceph_mon_client *monc =
747                 container_of(work, struct ceph_mon_client, delayed_work.work);
748 
749         dout("monc delayed_work\n");
750         mutex_lock(&monc->mutex);
751         if (monc->hunting) {
752                 __close_session(monc);
753                 __open_session(monc);  /* continue hunting */
754         } else {
755                 struct ceph_options *opt = monc->client->options;
756                 int is_auth = ceph_auth_is_authenticated(monc->auth);
757                 if (ceph_con_keepalive_expired(&monc->con,
758                                                opt->monc_ping_timeout)) {
759                         dout("monc keepalive timeout\n");
760                         is_auth = 0;
761                         __close_session(monc);
762                         monc->hunting = true;
763                         __open_session(monc);
764                 }
765 
766                 if (!monc->hunting) {
767                         ceph_con_keepalive(&monc->con);
768                         __validate_auth(monc);
769                 }
770 
771                 if (is_auth)
772                         __send_subscribe(monc);
773         }
774         __schedule_delayed(monc);
775         mutex_unlock(&monc->mutex);
776 }
777 
778 /*
779  * On startup, we build a temporary monmap populated with the IPs
780  * provided by mount(2).
781  */
782 static int build_initial_monmap(struct ceph_mon_client *monc)
783 {
784         struct ceph_options *opt = monc->client->options;
785         struct ceph_entity_addr *mon_addr = opt->mon_addr;
786         int num_mon = opt->num_mon;
787         int i;
788 
789         /* build initial monmap */
790         monc->monmap = kzalloc(sizeof(*monc->monmap) +
791                                num_mon*sizeof(monc->monmap->mon_inst[0]),
792                                GFP_KERNEL);
793         if (!monc->monmap)
794                 return -ENOMEM;
795         for (i = 0; i < num_mon; i++) {
796                 monc->monmap->mon_inst[i].addr = mon_addr[i];
797                 monc->monmap->mon_inst[i].addr.nonce = 0;
798                 monc->monmap->mon_inst[i].name.type =
799                         CEPH_ENTITY_TYPE_MON;
800                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
801         }
802         monc->monmap->num_mon = num_mon;
803         return 0;
804 }
805 
806 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
807 {
808         int err = 0;
809 
810         dout("init\n");
811         memset(monc, 0, sizeof(*monc));
812         monc->client = cl;
813         monc->monmap = NULL;
814         mutex_init(&monc->mutex);
815 
816         err = build_initial_monmap(monc);
817         if (err)
818                 goto out;
819 
820         /* connection */
821         /* authentication */
822         monc->auth = ceph_auth_init(cl->options->name,
823                                     cl->options->key);
824         if (IS_ERR(monc->auth)) {
825                 err = PTR_ERR(monc->auth);
826                 goto out_monmap;
827         }
828         monc->auth->want_keys =
829                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
830                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
831 
832         /* msgs */
833         err = -ENOMEM;
834         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
835                                      sizeof(struct ceph_mon_subscribe_ack),
836                                      GFP_NOFS, true);
837         if (!monc->m_subscribe_ack)
838                 goto out_auth;
839 
840         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
841                                          true);
842         if (!monc->m_subscribe)
843                 goto out_subscribe_ack;
844 
845         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
846                                           true);
847         if (!monc->m_auth_reply)
848                 goto out_subscribe;
849 
850         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
851         monc->pending_auth = 0;
852         if (!monc->m_auth)
853                 goto out_auth_reply;
854 
855         ceph_con_init(&monc->con, monc, &mon_con_ops,
856                       &monc->client->msgr);
857 
858         monc->cur_mon = -1;
859         monc->hunting = true;
860         monc->sub_renew_after = jiffies;
861         monc->sub_sent = 0;
862 
863         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
864         monc->generic_request_tree = RB_ROOT;
865         monc->num_generic_requests = 0;
866         monc->last_tid = 0;
867 
868         monc->have_mdsmap = 0;
869         monc->have_osdmap = 0;
870         monc->want_next_osdmap = 1;
871         return 0;
872 
873 out_auth_reply:
874         ceph_msg_put(monc->m_auth_reply);
875 out_subscribe:
876         ceph_msg_put(monc->m_subscribe);
877 out_subscribe_ack:
878         ceph_msg_put(monc->m_subscribe_ack);
879 out_auth:
880         ceph_auth_destroy(monc->auth);
881 out_monmap:
882         kfree(monc->monmap);
883 out:
884         return err;
885 }
886 EXPORT_SYMBOL(ceph_monc_init);
887 
888 void ceph_monc_stop(struct ceph_mon_client *monc)
889 {
890         dout("stop\n");
891         cancel_delayed_work_sync(&monc->delayed_work);
892 
893         mutex_lock(&monc->mutex);
894         __close_session(monc);
895 
896         mutex_unlock(&monc->mutex);
897 
898         /*
899          * flush msgr queue before we destroy ourselves to ensure that:
900          *  - any work that references our embedded con is finished.
901          *  - any osd_client or other work that may reference an authorizer
902          *    finishes before we shut down the auth subsystem.
903          */
904         ceph_msgr_flush();
905 
906         ceph_auth_destroy(monc->auth);
907 
908         ceph_msg_put(monc->m_auth);
909         ceph_msg_put(monc->m_auth_reply);
910         ceph_msg_put(monc->m_subscribe);
911         ceph_msg_put(monc->m_subscribe_ack);
912 
913         kfree(monc->monmap);
914 }
915 EXPORT_SYMBOL(ceph_monc_stop);
916 
917 static void handle_auth_reply(struct ceph_mon_client *monc,
918                               struct ceph_msg *msg)
919 {
920         int ret;
921         int was_auth = 0;
922         int had_debugfs_info, init_debugfs = 0;
923 
924         mutex_lock(&monc->mutex);
925         had_debugfs_info = have_debugfs_info(monc);
926         was_auth = ceph_auth_is_authenticated(monc->auth);
927         monc->pending_auth = 0;
928         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
929                                      msg->front.iov_len,
930                                      monc->m_auth->front.iov_base,
931                                      monc->m_auth->front_alloc_len);
932         if (ret < 0) {
933                 monc->client->auth_err = ret;
934                 wake_up_all(&monc->client->auth_wq);
935         } else if (ret > 0) {
936                 __send_prepared_auth_request(monc, ret);
937         } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
938                 dout("authenticated, starting session\n");
939 
940                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
941                 monc->client->msgr.inst.name.num =
942                                         cpu_to_le64(monc->auth->global_id);
943 
944                 __send_subscribe(monc);
945                 __resend_generic_request(monc);
946         }
947 
948         if (!had_debugfs_info && have_debugfs_info(monc)) {
949                 pr_info("client%lld fsid %pU\n",
950                         ceph_client_id(monc->client),
951                         &monc->client->fsid);
952                 init_debugfs = 1;
953         }
954         mutex_unlock(&monc->mutex);
955 
956         if (init_debugfs) {
957                 /*
958                  * do debugfs initialization without mutex to avoid
959                  * creating a locking dependency
960                  */
961                 ceph_debugfs_client_init(monc->client);
962         }
963 }
964 
965 static int __validate_auth(struct ceph_mon_client *monc)
966 {
967         int ret;
968 
969         if (monc->pending_auth)
970                 return 0;
971 
972         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
973                               monc->m_auth->front_alloc_len);
974         if (ret <= 0)
975                 return ret; /* either an error, or no need to authenticate */
976         __send_prepared_auth_request(monc, ret);
977         return 0;
978 }
979 
980 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
981 {
982         int ret;
983 
984         mutex_lock(&monc->mutex);
985         ret = __validate_auth(monc);
986         mutex_unlock(&monc->mutex);
987         return ret;
988 }
989 EXPORT_SYMBOL(ceph_monc_validate_auth);
990 
991 /*
992  * handle incoming message
993  */
994 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
995 {
996         struct ceph_mon_client *monc = con->private;
997         int type = le16_to_cpu(msg->hdr.type);
998 
999         if (!monc)
1000                 return;
1001 
1002         switch (type) {
1003         case CEPH_MSG_AUTH_REPLY:
1004                 handle_auth_reply(monc, msg);
1005                 break;
1006 
1007         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1008                 handle_subscribe_ack(monc, msg);
1009                 break;
1010 
1011         case CEPH_MSG_STATFS_REPLY:
1012                 handle_statfs_reply(monc, msg);
1013                 break;
1014 
1015         case CEPH_MSG_MON_GET_VERSION_REPLY:
1016                 handle_get_version_reply(monc, msg);
1017                 break;
1018 
1019         case CEPH_MSG_MON_MAP:
1020                 ceph_monc_handle_map(monc, msg);
1021                 break;
1022 
1023         case CEPH_MSG_OSD_MAP:
1024                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1025                 break;
1026 
1027         default:
1028                 /* can the chained handler handle it? */
1029                 if (monc->client->extra_mon_dispatch &&
1030                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1031                         break;
1032                         
1033                 pr_err("received unknown message type %d %s\n", type,
1034                        ceph_msg_type_name(type));
1035         }
1036         ceph_msg_put(msg);
1037 }
1038 
1039 /*
1040  * Allocate memory for incoming message
1041  */
1042 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1043                                       struct ceph_msg_header *hdr,
1044                                       int *skip)
1045 {
1046         struct ceph_mon_client *monc = con->private;
1047         int type = le16_to_cpu(hdr->type);
1048         int front_len = le32_to_cpu(hdr->front_len);
1049         struct ceph_msg *m = NULL;
1050 
1051         *skip = 0;
1052 
1053         switch (type) {
1054         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1055                 m = ceph_msg_get(monc->m_subscribe_ack);
1056                 break;
1057         case CEPH_MSG_STATFS_REPLY:
1058                 return get_generic_reply(con, hdr, skip);
1059         case CEPH_MSG_AUTH_REPLY:
1060                 m = ceph_msg_get(monc->m_auth_reply);
1061                 break;
1062         case CEPH_MSG_MON_GET_VERSION_REPLY:
1063                 if (le64_to_cpu(hdr->tid) != 0)
1064                         return get_generic_reply(con, hdr, skip);
1065 
1066                 /*
1067                  * Older OSDs don't set reply tid even if the orignal
1068                  * request had a non-zero tid.  Workaround this weirdness
1069                  * by falling through to the allocate case.
1070                  */
1071         case CEPH_MSG_MON_MAP:
1072         case CEPH_MSG_MDS_MAP:
1073         case CEPH_MSG_OSD_MAP:
1074                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1075                 if (!m)
1076                         return NULL;    /* ENOMEM--return skip == 0 */
1077                 break;
1078         }
1079 
1080         if (!m) {
1081                 pr_info("alloc_msg unknown type %d\n", type);
1082                 *skip = 1;
1083         } else if (front_len > m->front_alloc_len) {
1084                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1085                         front_len, m->front_alloc_len,
1086                         (unsigned int)con->peer_name.type,
1087                         le64_to_cpu(con->peer_name.num));
1088                 ceph_msg_put(m);
1089                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1090         }
1091 
1092         return m;
1093 }
1094 
1095 /*
1096  * If the monitor connection resets, pick a new monitor and resubmit
1097  * any pending requests.
1098  */
1099 static void mon_fault(struct ceph_connection *con)
1100 {
1101         struct ceph_mon_client *monc = con->private;
1102 
1103         if (!monc)
1104                 return;
1105 
1106         dout("mon_fault\n");
1107         mutex_lock(&monc->mutex);
1108         if (!con->private)
1109                 goto out;
1110 
1111         if (!monc->hunting)
1112                 pr_info("mon%d %s session lost, "
1113                         "hunting for new mon\n", monc->cur_mon,
1114                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
1115 
1116         __close_session(monc);
1117         if (!monc->hunting) {
1118                 /* start hunting */
1119                 monc->hunting = true;
1120                 __open_session(monc);
1121         } else {
1122                 /* already hunting, let's wait a bit */
1123                 __schedule_delayed(monc);
1124         }
1125 out:
1126         mutex_unlock(&monc->mutex);
1127 }
1128 
1129 /*
1130  * We can ignore refcounting on the connection struct, as all references
1131  * will come from the messenger workqueue, which is drained prior to
1132  * mon_client destruction.
1133  */
1134 static struct ceph_connection *con_get(struct ceph_connection *con)
1135 {
1136         return con;
1137 }
1138 
1139 static void con_put(struct ceph_connection *con)
1140 {
1141 }
1142 
1143 static const struct ceph_connection_operations mon_con_ops = {
1144         .get = con_get,
1145         .put = con_put,
1146         .dispatch = dispatch,
1147         .fault = mon_fault,
1148         .alloc_msg = mon_alloc_msg,
1149 };
1150 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | Wiki (Japanese) | Wiki (English) | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

osdn.jp