1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OP_FRONT_LEN 4096 23 #define OSD_OPREPLY_FRONT_LEN 512 24 25 static struct kmem_cache *ceph_osd_request_cache; 26 27 static const struct ceph_connection_operations osd_con_ops; 28 29 static void __send_queued(struct ceph_osd_client *osdc); 30 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 31 static void __register_request(struct ceph_osd_client *osdc, 32 struct ceph_osd_request *req); 33 static void __unregister_linger_request(struct ceph_osd_client *osdc, 34 struct ceph_osd_request *req); 35 static void __send_request(struct ceph_osd_client *osdc, 36 struct ceph_osd_request *req); 37 38 /* 39 * Implement client access to distributed object storage cluster. 40 * 41 * All data objects are stored within a cluster/cloud of OSDs, or 42 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 43 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 44 * remote daemons serving up and coordinating consistent and safe 45 * access to storage. 46 * 47 * Cluster membership and the mapping of data objects onto storage devices 48 * are described by the osd map. 49 * 50 * We keep track of pending OSD requests (read, write), resubmit 51 * requests to different OSDs when the cluster topology/data layout 52 * change, or retry the affected requests when the communications 53 * channel with an OSD is reset. 54 */ 55 56 /* 57 * calculate the mapping of a file extent onto an object, and fill out the 58 * request accordingly. shorten extent as necessary if it crosses an 59 * object boundary. 60 * 61 * fill osd op in request message. 62 */ 63 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 64 u64 *objnum, u64 *objoff, u64 *objlen) 65 { 66 u64 orig_len = *plen; 67 int r; 68 69 /* object extent? */ 70 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 71 objoff, objlen); 72 if (r < 0) 73 return r; 74 if (*objlen < orig_len) { 75 *plen = *objlen; 76 dout(" skipping last %llu, final file extent %llu~%llu\n", 77 orig_len - *plen, off, *plen); 78 } 79 80 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 81 82 return 0; 83 } 84 85 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 86 { 87 memset(osd_data, 0, sizeof (*osd_data)); 88 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 89 } 90 91 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 92 struct page **pages, u64 length, u32 alignment, 93 bool pages_from_pool, bool own_pages) 94 { 95 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 96 osd_data->pages = pages; 97 osd_data->length = length; 98 osd_data->alignment = alignment; 99 osd_data->pages_from_pool = pages_from_pool; 100 osd_data->own_pages = own_pages; 101 } 102 103 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 104 struct ceph_pagelist *pagelist) 105 { 106 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 107 osd_data->pagelist = pagelist; 108 } 109 110 #ifdef CONFIG_BLOCK 111 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 112 struct bio *bio, size_t bio_length) 113 { 114 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 115 osd_data->bio = bio; 116 osd_data->bio_length = bio_length; 117 } 118 #endif /* CONFIG_BLOCK */ 119 120 #define osd_req_op_data(oreq, whch, typ, fld) \ 121 ({ \ 122 BUG_ON(whch >= (oreq)->r_num_ops); \ 123 &(oreq)->r_ops[whch].typ.fld; \ 124 }) 125 126 static struct ceph_osd_data * 127 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 128 { 129 BUG_ON(which >= osd_req->r_num_ops); 130 131 return &osd_req->r_ops[which].raw_data_in; 132 } 133 134 struct ceph_osd_data * 135 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 136 unsigned int which) 137 { 138 return osd_req_op_data(osd_req, which, extent, osd_data); 139 } 140 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 141 142 struct ceph_osd_data * 143 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, 144 unsigned int which) 145 { 146 return osd_req_op_data(osd_req, which, cls, response_data); 147 } 148 EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */ 149 150 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 151 unsigned int which, struct page **pages, 152 u64 length, u32 alignment, 153 bool pages_from_pool, bool own_pages) 154 { 155 struct ceph_osd_data *osd_data; 156 157 osd_data = osd_req_op_raw_data_in(osd_req, which); 158 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 159 pages_from_pool, own_pages); 160 } 161 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 162 163 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 164 unsigned int which, struct page **pages, 165 u64 length, u32 alignment, 166 bool pages_from_pool, bool own_pages) 167 { 168 struct ceph_osd_data *osd_data; 169 170 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 171 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 172 pages_from_pool, own_pages); 173 } 174 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 175 176 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 177 unsigned int which, struct ceph_pagelist *pagelist) 178 { 179 struct ceph_osd_data *osd_data; 180 181 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 182 ceph_osd_data_pagelist_init(osd_data, pagelist); 183 } 184 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 185 186 #ifdef CONFIG_BLOCK 187 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 188 unsigned int which, struct bio *bio, size_t bio_length) 189 { 190 struct ceph_osd_data *osd_data; 191 192 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 193 ceph_osd_data_bio_init(osd_data, bio, bio_length); 194 } 195 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 196 #endif /* CONFIG_BLOCK */ 197 198 static void osd_req_op_cls_request_info_pagelist( 199 struct ceph_osd_request *osd_req, 200 unsigned int which, struct ceph_pagelist *pagelist) 201 { 202 struct ceph_osd_data *osd_data; 203 204 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 205 ceph_osd_data_pagelist_init(osd_data, pagelist); 206 } 207 208 void osd_req_op_cls_request_data_pagelist( 209 struct ceph_osd_request *osd_req, 210 unsigned int which, struct ceph_pagelist *pagelist) 211 { 212 struct ceph_osd_data *osd_data; 213 214 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 215 ceph_osd_data_pagelist_init(osd_data, pagelist); 216 } 217 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 218 219 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 220 unsigned int which, struct page **pages, u64 length, 221 u32 alignment, bool pages_from_pool, bool own_pages) 222 { 223 struct ceph_osd_data *osd_data; 224 225 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 226 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 227 pages_from_pool, own_pages); 228 } 229 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 230 231 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 232 unsigned int which, struct page **pages, u64 length, 233 u32 alignment, bool pages_from_pool, bool own_pages) 234 { 235 struct ceph_osd_data *osd_data; 236 237 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 238 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 239 pages_from_pool, own_pages); 240 } 241 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 242 243 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 244 { 245 switch (osd_data->type) { 246 case CEPH_OSD_DATA_TYPE_NONE: 247 return 0; 248 case CEPH_OSD_DATA_TYPE_PAGES: 249 return osd_data->length; 250 case CEPH_OSD_DATA_TYPE_PAGELIST: 251 return (u64)osd_data->pagelist->length; 252 #ifdef CONFIG_BLOCK 253 case CEPH_OSD_DATA_TYPE_BIO: 254 return (u64)osd_data->bio_length; 255 #endif /* CONFIG_BLOCK */ 256 default: 257 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 258 return 0; 259 } 260 } 261 262 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 263 { 264 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 265 int num_pages; 266 267 num_pages = calc_pages_for((u64)osd_data->alignment, 268 (u64)osd_data->length); 269 ceph_release_page_vector(osd_data->pages, num_pages); 270 } 271 ceph_osd_data_init(osd_data); 272 } 273 274 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 275 unsigned int which) 276 { 277 struct ceph_osd_req_op *op; 278 279 BUG_ON(which >= osd_req->r_num_ops); 280 op = &osd_req->r_ops[which]; 281 282 switch (op->op) { 283 case CEPH_OSD_OP_READ: 284 case CEPH_OSD_OP_WRITE: 285 ceph_osd_data_release(&op->extent.osd_data); 286 break; 287 case CEPH_OSD_OP_CALL: 288 ceph_osd_data_release(&op->cls.request_info); 289 ceph_osd_data_release(&op->cls.request_data); 290 ceph_osd_data_release(&op->cls.response_data); 291 break; 292 default: 293 break; 294 } 295 } 296 297 /* 298 * requests 299 */ 300 void ceph_osdc_release_request(struct kref *kref) 301 { 302 struct ceph_osd_request *req; 303 unsigned int which; 304 305 req = container_of(kref, struct ceph_osd_request, r_kref); 306 if (req->r_request) 307 ceph_msg_put(req->r_request); 308 if (req->r_reply) { 309 ceph_msg_revoke_incoming(req->r_reply); 310 ceph_msg_put(req->r_reply); 311 } 312 313 for (which = 0; which < req->r_num_ops; which++) 314 osd_req_op_data_release(req, which); 315 316 ceph_put_snap_context(req->r_snapc); 317 if (req->r_mempool) 318 mempool_free(req, req->r_osdc->req_mempool); 319 else 320 kmem_cache_free(ceph_osd_request_cache, req); 321 322 } 323 EXPORT_SYMBOL(ceph_osdc_release_request); 324 325 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 326 struct ceph_snap_context *snapc, 327 unsigned int num_ops, 328 bool use_mempool, 329 gfp_t gfp_flags) 330 { 331 struct ceph_osd_request *req; 332 struct ceph_msg *msg; 333 size_t msg_size; 334 335 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); 336 BUG_ON(num_ops > CEPH_OSD_MAX_OP); 337 338 msg_size = 4 + 4 + 8 + 8 + 4+8; 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 341 msg_size += 4 + MAX_OBJ_NAME_SIZE; 342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 343 msg_size += 8; /* snapid */ 344 msg_size += 8; /* snap_seq */ 345 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 346 msg_size += 4; 347 348 if (use_mempool) { 349 req = mempool_alloc(osdc->req_mempool, gfp_flags); 350 memset(req, 0, sizeof(*req)); 351 } else { 352 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); 353 } 354 if (req == NULL) 355 return NULL; 356 357 req->r_osdc = osdc; 358 req->r_mempool = use_mempool; 359 req->r_num_ops = num_ops; 360 361 kref_init(&req->r_kref); 362 init_completion(&req->r_completion); 363 init_completion(&req->r_safe_completion); 364 RB_CLEAR_NODE(&req->r_node); 365 INIT_LIST_HEAD(&req->r_unsafe_item); 366 INIT_LIST_HEAD(&req->r_linger_item); 367 INIT_LIST_HEAD(&req->r_linger_osd); 368 INIT_LIST_HEAD(&req->r_req_lru_item); 369 INIT_LIST_HEAD(&req->r_osd_item); 370 371 /* create reply message */ 372 if (use_mempool) 373 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 374 else 375 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 376 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 377 if (!msg) { 378 ceph_osdc_put_request(req); 379 return NULL; 380 } 381 req->r_reply = msg; 382 383 /* create request message; allow space for oid */ 384 if (use_mempool) 385 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 386 else 387 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 388 if (!msg) { 389 ceph_osdc_put_request(req); 390 return NULL; 391 } 392 393 memset(msg->front.iov_base, 0, msg->front.iov_len); 394 395 req->r_request = msg; 396 397 return req; 398 } 399 EXPORT_SYMBOL(ceph_osdc_alloc_request); 400 401 static bool osd_req_opcode_valid(u16 opcode) 402 { 403 switch (opcode) { 404 case CEPH_OSD_OP_READ: 405 case CEPH_OSD_OP_STAT: 406 case CEPH_OSD_OP_MAPEXT: 407 case CEPH_OSD_OP_MASKTRUNC: 408 case CEPH_OSD_OP_SPARSE_READ: 409 case CEPH_OSD_OP_NOTIFY: 410 case CEPH_OSD_OP_NOTIFY_ACK: 411 case CEPH_OSD_OP_ASSERT_VER: 412 case CEPH_OSD_OP_WRITE: 413 case CEPH_OSD_OP_WRITEFULL: 414 case CEPH_OSD_OP_TRUNCATE: 415 case CEPH_OSD_OP_ZERO: 416 case CEPH_OSD_OP_DELETE: 417 case CEPH_OSD_OP_APPEND: 418 case CEPH_OSD_OP_STARTSYNC: 419 case CEPH_OSD_OP_SETTRUNC: 420 case CEPH_OSD_OP_TRIMTRUNC: 421 case CEPH_OSD_OP_TMAPUP: 422 case CEPH_OSD_OP_TMAPPUT: 423 case CEPH_OSD_OP_TMAPGET: 424 case CEPH_OSD_OP_CREATE: 425 case CEPH_OSD_OP_ROLLBACK: 426 case CEPH_OSD_OP_WATCH: 427 case CEPH_OSD_OP_OMAPGETKEYS: 428 case CEPH_OSD_OP_OMAPGETVALS: 429 case CEPH_OSD_OP_OMAPGETHEADER: 430 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 431 case CEPH_OSD_OP_OMAPSETVALS: 432 case CEPH_OSD_OP_OMAPSETHEADER: 433 case CEPH_OSD_OP_OMAPCLEAR: 434 case CEPH_OSD_OP_OMAPRMKEYS: 435 case CEPH_OSD_OP_OMAP_CMP: 436 case CEPH_OSD_OP_CLONERANGE: 437 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 438 case CEPH_OSD_OP_SRC_CMPXATTR: 439 case CEPH_OSD_OP_GETXATTR: 440 case CEPH_OSD_OP_GETXATTRS: 441 case CEPH_OSD_OP_CMPXATTR: 442 case CEPH_OSD_OP_SETXATTR: 443 case CEPH_OSD_OP_SETXATTRS: 444 case CEPH_OSD_OP_RESETXATTRS: 445 case CEPH_OSD_OP_RMXATTR: 446 case CEPH_OSD_OP_PULL: 447 case CEPH_OSD_OP_PUSH: 448 case CEPH_OSD_OP_BALANCEREADS: 449 case CEPH_OSD_OP_UNBALANCEREADS: 450 case CEPH_OSD_OP_SCRUB: 451 case CEPH_OSD_OP_SCRUB_RESERVE: 452 case CEPH_OSD_OP_SCRUB_UNRESERVE: 453 case CEPH_OSD_OP_SCRUB_STOP: 454 case CEPH_OSD_OP_SCRUB_MAP: 455 case CEPH_OSD_OP_WRLOCK: 456 case CEPH_OSD_OP_WRUNLOCK: 457 case CEPH_OSD_OP_RDLOCK: 458 case CEPH_OSD_OP_RDUNLOCK: 459 case CEPH_OSD_OP_UPLOCK: 460 case CEPH_OSD_OP_DNLOCK: 461 case CEPH_OSD_OP_CALL: 462 case CEPH_OSD_OP_PGLS: 463 case CEPH_OSD_OP_PGLS_FILTER: 464 return true; 465 default: 466 return false; 467 } 468 } 469 470 /* 471 * This is an osd op init function for opcodes that have no data or 472 * other information associated with them. It also serves as a 473 * common init routine for all the other init functions, below. 474 */ 475 static struct ceph_osd_req_op * 476 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 477 u16 opcode) 478 { 479 struct ceph_osd_req_op *op; 480 481 BUG_ON(which >= osd_req->r_num_ops); 482 BUG_ON(!osd_req_opcode_valid(opcode)); 483 484 op = &osd_req->r_ops[which]; 485 memset(op, 0, sizeof (*op)); 486 op->op = opcode; 487 488 return op; 489 } 490 491 void osd_req_op_init(struct ceph_osd_request *osd_req, 492 unsigned int which, u16 opcode) 493 { 494 (void)_osd_req_op_init(osd_req, which, opcode); 495 } 496 EXPORT_SYMBOL(osd_req_op_init); 497 498 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 499 unsigned int which, u16 opcode, 500 u64 offset, u64 length, 501 u64 truncate_size, u32 truncate_seq) 502 { 503 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 504 size_t payload_len = 0; 505 506 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 507 508 op->extent.offset = offset; 509 op->extent.length = length; 510 op->extent.truncate_size = truncate_size; 511 op->extent.truncate_seq = truncate_seq; 512 if (opcode == CEPH_OSD_OP_WRITE) 513 payload_len += length; 514 515 op->payload_len = payload_len; 516 } 517 EXPORT_SYMBOL(osd_req_op_extent_init); 518 519 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 520 unsigned int which, u64 length) 521 { 522 struct ceph_osd_req_op *op; 523 u64 previous; 524 525 BUG_ON(which >= osd_req->r_num_ops); 526 op = &osd_req->r_ops[which]; 527 previous = op->extent.length; 528 529 if (length == previous) 530 return; /* Nothing to do */ 531 BUG_ON(length > previous); 532 533 op->extent.length = length; 534 op->payload_len -= previous - length; 535 } 536 EXPORT_SYMBOL(osd_req_op_extent_update); 537 538 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 539 u16 opcode, const char *class, const char *method) 540 { 541 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 542 struct ceph_pagelist *pagelist; 543 size_t payload_len = 0; 544 size_t size; 545 546 BUG_ON(opcode != CEPH_OSD_OP_CALL); 547 548 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 549 BUG_ON(!pagelist); 550 ceph_pagelist_init(pagelist); 551 552 op->cls.class_name = class; 553 size = strlen(class); 554 BUG_ON(size > (size_t) U8_MAX); 555 op->cls.class_len = size; 556 ceph_pagelist_append(pagelist, class, size); 557 payload_len += size; 558 559 op->cls.method_name = method; 560 size = strlen(method); 561 BUG_ON(size > (size_t) U8_MAX); 562 op->cls.method_len = size; 563 ceph_pagelist_append(pagelist, method, size); 564 payload_len += size; 565 566 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 567 568 op->cls.argc = 0; /* currently unused */ 569 570 op->payload_len = payload_len; 571 } 572 EXPORT_SYMBOL(osd_req_op_cls_init); 573 574 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 575 unsigned int which, u16 opcode, 576 u64 cookie, u64 version, int flag) 577 { 578 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 579 580 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 581 582 op->watch.cookie = cookie; 583 op->watch.ver = version; 584 if (opcode == CEPH_OSD_OP_WATCH && flag) 585 op->watch.flag = (u8)1; 586 } 587 EXPORT_SYMBOL(osd_req_op_watch_init); 588 589 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 590 struct ceph_osd_data *osd_data) 591 { 592 u64 length = ceph_osd_data_length(osd_data); 593 594 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 595 BUG_ON(length > (u64) SIZE_MAX); 596 if (length) 597 ceph_msg_data_add_pages(msg, osd_data->pages, 598 length, osd_data->alignment); 599 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 600 BUG_ON(!length); 601 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 602 #ifdef CONFIG_BLOCK 603 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 604 ceph_msg_data_add_bio(msg, osd_data->bio, length); 605 #endif 606 } else { 607 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 608 } 609 } 610 611 static u64 osd_req_encode_op(struct ceph_osd_request *req, 612 struct ceph_osd_op *dst, unsigned int which) 613 { 614 struct ceph_osd_req_op *src; 615 struct ceph_osd_data *osd_data; 616 u64 request_data_len = 0; 617 u64 data_length; 618 619 BUG_ON(which >= req->r_num_ops); 620 src = &req->r_ops[which]; 621 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 622 pr_err("unrecognized osd opcode %d\n", src->op); 623 624 return 0; 625 } 626 627 switch (src->op) { 628 case CEPH_OSD_OP_STAT: 629 osd_data = &src->raw_data_in; 630 ceph_osdc_msg_data_add(req->r_reply, osd_data); 631 break; 632 case CEPH_OSD_OP_READ: 633 case CEPH_OSD_OP_WRITE: 634 if (src->op == CEPH_OSD_OP_WRITE) 635 request_data_len = src->extent.length; 636 dst->extent.offset = cpu_to_le64(src->extent.offset); 637 dst->extent.length = cpu_to_le64(src->extent.length); 638 dst->extent.truncate_size = 639 cpu_to_le64(src->extent.truncate_size); 640 dst->extent.truncate_seq = 641 cpu_to_le32(src->extent.truncate_seq); 642 osd_data = &src->extent.osd_data; 643 if (src->op == CEPH_OSD_OP_WRITE) 644 ceph_osdc_msg_data_add(req->r_request, osd_data); 645 else 646 ceph_osdc_msg_data_add(req->r_reply, osd_data); 647 break; 648 case CEPH_OSD_OP_CALL: 649 dst->cls.class_len = src->cls.class_len; 650 dst->cls.method_len = src->cls.method_len; 651 osd_data = &src->cls.request_info; 652 ceph_osdc_msg_data_add(req->r_request, osd_data); 653 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); 654 request_data_len = osd_data->pagelist->length; 655 656 osd_data = &src->cls.request_data; 657 data_length = ceph_osd_data_length(osd_data); 658 if (data_length) { 659 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 660 dst->cls.indata_len = cpu_to_le32(data_length); 661 ceph_osdc_msg_data_add(req->r_request, osd_data); 662 src->payload_len += data_length; 663 request_data_len += data_length; 664 } 665 osd_data = &src->cls.response_data; 666 ceph_osdc_msg_data_add(req->r_reply, osd_data); 667 break; 668 case CEPH_OSD_OP_STARTSYNC: 669 break; 670 case CEPH_OSD_OP_NOTIFY_ACK: 671 case CEPH_OSD_OP_WATCH: 672 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 673 dst->watch.ver = cpu_to_le64(src->watch.ver); 674 dst->watch.flag = src->watch.flag; 675 break; 676 default: 677 pr_err("unsupported osd opcode %s\n", 678 ceph_osd_op_name(src->op)); 679 WARN_ON(1); 680 681 return 0; 682 } 683 dst->op = cpu_to_le16(src->op); 684 dst->payload_len = cpu_to_le32(src->payload_len); 685 686 return request_data_len; 687 } 688 689 /* 690 * build new request AND message, calculate layout, and adjust file 691 * extent as needed. 692 * 693 * if the file was recently truncated, we include information about its 694 * old and new size so that the object can be updated appropriately. (we 695 * avoid synchronously deleting truncated objects because it's slow.) 696 * 697 * if @do_sync, include a 'startsync' command so that the osd will flush 698 * data quickly. 699 */ 700 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 701 struct ceph_file_layout *layout, 702 struct ceph_vino vino, 703 u64 off, u64 *plen, int num_ops, 704 int opcode, int flags, 705 struct ceph_snap_context *snapc, 706 u32 truncate_seq, 707 u64 truncate_size, 708 bool use_mempool) 709 { 710 struct ceph_osd_request *req; 711 u64 objnum = 0; 712 u64 objoff = 0; 713 u64 objlen = 0; 714 u32 object_size; 715 u64 object_base; 716 int r; 717 718 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 719 720 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 721 GFP_NOFS); 722 if (!req) 723 return ERR_PTR(-ENOMEM); 724 725 req->r_flags = flags; 726 727 /* calculate max write size */ 728 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 729 if (r < 0) { 730 ceph_osdc_put_request(req); 731 return ERR_PTR(r); 732 } 733 734 object_size = le32_to_cpu(layout->fl_object_size); 735 object_base = off - objoff; 736 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 737 if (truncate_size <= object_base) { 738 truncate_size = 0; 739 } else { 740 truncate_size -= object_base; 741 if (truncate_size > object_size) 742 truncate_size = object_size; 743 } 744 } 745 746 osd_req_op_extent_init(req, 0, opcode, objoff, objlen, 747 truncate_size, truncate_seq); 748 749 /* 750 * A second op in the ops array means the caller wants to 751 * also issue a include a 'startsync' command so that the 752 * osd will flush data quickly. 753 */ 754 if (num_ops > 1) 755 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 756 757 req->r_file_layout = *layout; /* keep a copy */ 758 759 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 760 vino.ino, objnum); 761 req->r_oid_len = strlen(req->r_oid); 762 763 return req; 764 } 765 EXPORT_SYMBOL(ceph_osdc_new_request); 766 767 /* 768 * We keep osd requests in an rbtree, sorted by ->r_tid. 769 */ 770 static void __insert_request(struct ceph_osd_client *osdc, 771 struct ceph_osd_request *new) 772 { 773 struct rb_node **p = &osdc->requests.rb_node; 774 struct rb_node *parent = NULL; 775 struct ceph_osd_request *req = NULL; 776 777 while (*p) { 778 parent = *p; 779 req = rb_entry(parent, struct ceph_osd_request, r_node); 780 if (new->r_tid < req->r_tid) 781 p = &(*p)->rb_left; 782 else if (new->r_tid > req->r_tid) 783 p = &(*p)->rb_right; 784 else 785 BUG(); 786 } 787 788 rb_link_node(&new->r_node, parent, p); 789 rb_insert_color(&new->r_node, &osdc->requests); 790 } 791 792 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 793 u64 tid) 794 { 795 struct ceph_osd_request *req; 796 struct rb_node *n = osdc->requests.rb_node; 797 798 while (n) { 799 req = rb_entry(n, struct ceph_osd_request, r_node); 800 if (tid < req->r_tid) 801 n = n->rb_left; 802 else if (tid > req->r_tid) 803 n = n->rb_right; 804 else 805 return req; 806 } 807 return NULL; 808 } 809 810 static struct ceph_osd_request * 811 __lookup_request_ge(struct ceph_osd_client *osdc, 812 u64 tid) 813 { 814 struct ceph_osd_request *req; 815 struct rb_node *n = osdc->requests.rb_node; 816 817 while (n) { 818 req = rb_entry(n, struct ceph_osd_request, r_node); 819 if (tid < req->r_tid) { 820 if (!n->rb_left) 821 return req; 822 n = n->rb_left; 823 } else if (tid > req->r_tid) { 824 n = n->rb_right; 825 } else { 826 return req; 827 } 828 } 829 return NULL; 830 } 831 832 /* 833 * Resubmit requests pending on the given osd. 834 */ 835 static void __kick_osd_requests(struct ceph_osd_client *osdc, 836 struct ceph_osd *osd) 837 { 838 struct ceph_osd_request *req, *nreq; 839 LIST_HEAD(resend); 840 int err; 841 842 dout("__kick_osd_requests osd%d\n", osd->o_osd); 843 err = __reset_osd(osdc, osd); 844 if (err) 845 return; 846 /* 847 * Build up a list of requests to resend by traversing the 848 * osd's list of requests. Requests for a given object are 849 * sent in tid order, and that is also the order they're 850 * kept on this list. Therefore all requests that are in 851 * flight will be found first, followed by all requests that 852 * have not yet been sent. And to resend requests while 853 * preserving this order we will want to put any sent 854 * requests back on the front of the osd client's unsent 855 * list. 856 * 857 * So we build a separate ordered list of already-sent 858 * requests for the affected osd and splice it onto the 859 * front of the osd client's unsent list. Once we've seen a 860 * request that has not yet been sent we're done. Those 861 * requests are already sitting right where they belong. 862 */ 863 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 864 if (!req->r_sent) 865 break; 866 list_move_tail(&req->r_req_lru_item, &resend); 867 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 868 osd->o_osd); 869 if (!req->r_linger) 870 req->r_flags |= CEPH_OSD_FLAG_RETRY; 871 } 872 list_splice(&resend, &osdc->req_unsent); 873 874 /* 875 * Linger requests are re-registered before sending, which 876 * sets up a new tid for each. We add them to the unsent 877 * list at the end to keep things in tid order. 878 */ 879 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 880 r_linger_osd) { 881 /* 882 * reregister request prior to unregistering linger so 883 * that r_osd is preserved. 884 */ 885 BUG_ON(!list_empty(&req->r_req_lru_item)); 886 __register_request(osdc, req); 887 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 888 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 889 __unregister_linger_request(osdc, req); 890 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 891 osd->o_osd); 892 } 893 } 894 895 /* 896 * If the osd connection drops, we need to resubmit all requests. 897 */ 898 static void osd_reset(struct ceph_connection *con) 899 { 900 struct ceph_osd *osd = con->private; 901 struct ceph_osd_client *osdc; 902 903 if (!osd) 904 return; 905 dout("osd_reset osd%d\n", osd->o_osd); 906 osdc = osd->o_osdc; 907 down_read(&osdc->map_sem); 908 mutex_lock(&osdc->request_mutex); 909 __kick_osd_requests(osdc, osd); 910 __send_queued(osdc); 911 mutex_unlock(&osdc->request_mutex); 912 up_read(&osdc->map_sem); 913 } 914 915 /* 916 * Track open sessions with osds. 917 */ 918 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 919 { 920 struct ceph_osd *osd; 921 922 osd = kzalloc(sizeof(*osd), GFP_NOFS); 923 if (!osd) 924 return NULL; 925 926 atomic_set(&osd->o_ref, 1); 927 osd->o_osdc = osdc; 928 osd->o_osd = onum; 929 RB_CLEAR_NODE(&osd->o_node); 930 INIT_LIST_HEAD(&osd->o_requests); 931 INIT_LIST_HEAD(&osd->o_linger_requests); 932 INIT_LIST_HEAD(&osd->o_osd_lru); 933 osd->o_incarnation = 1; 934 935 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 936 937 INIT_LIST_HEAD(&osd->o_keepalive_item); 938 return osd; 939 } 940 941 static struct ceph_osd *get_osd(struct ceph_osd *osd) 942 { 943 if (atomic_inc_not_zero(&osd->o_ref)) { 944 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 945 atomic_read(&osd->o_ref)); 946 return osd; 947 } else { 948 dout("get_osd %p FAIL\n", osd); 949 return NULL; 950 } 951 } 952 953 static void put_osd(struct ceph_osd *osd) 954 { 955 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 956 atomic_read(&osd->o_ref) - 1); 957 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 958 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 959 960 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 961 kfree(osd); 962 } 963 } 964 965 /* 966 * remove an osd from our map 967 */ 968 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 969 { 970 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 971 WARN_ON(!list_empty(&osd->o_requests)); 972 WARN_ON(!list_empty(&osd->o_linger_requests)); 973 974 list_del_init(&osd->o_osd_lru); 975 rb_erase(&osd->o_node, &osdc->osds); 976 RB_CLEAR_NODE(&osd->o_node); 977 } 978 979 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 980 { 981 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 982 983 if (!RB_EMPTY_NODE(&osd->o_node)) { 984 ceph_con_close(&osd->o_con); 985 __remove_osd(osdc, osd); 986 put_osd(osd); 987 } 988 } 989 990 static void remove_all_osds(struct ceph_osd_client *osdc) 991 { 992 dout("%s %p\n", __func__, osdc); 993 mutex_lock(&osdc->request_mutex); 994 while (!RB_EMPTY_ROOT(&osdc->osds)) { 995 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 996 struct ceph_osd, o_node); 997 remove_osd(osdc, osd); 998 } 999 mutex_unlock(&osdc->request_mutex); 1000 } 1001 1002 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1003 struct ceph_osd *osd) 1004 { 1005 dout("__move_osd_to_lru %p\n", osd); 1006 BUG_ON(!list_empty(&osd->o_osd_lru)); 1007 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1008 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 1009 } 1010 1011 static void __remove_osd_from_lru(struct ceph_osd *osd) 1012 { 1013 dout("__remove_osd_from_lru %p\n", osd); 1014 if (!list_empty(&osd->o_osd_lru)) 1015 list_del_init(&osd->o_osd_lru); 1016 } 1017 1018 static void remove_old_osds(struct ceph_osd_client *osdc) 1019 { 1020 struct ceph_osd *osd, *nosd; 1021 1022 dout("__remove_old_osds %p\n", osdc); 1023 mutex_lock(&osdc->request_mutex); 1024 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1025 if (time_before(jiffies, osd->lru_ttl)) 1026 break; 1027 remove_osd(osdc, osd); 1028 } 1029 mutex_unlock(&osdc->request_mutex); 1030 } 1031 1032 /* 1033 * reset osd connect 1034 */ 1035 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1036 { 1037 struct ceph_entity_addr *peer_addr; 1038 1039 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1040 if (list_empty(&osd->o_requests) && 1041 list_empty(&osd->o_linger_requests)) { 1042 remove_osd(osdc, osd); 1043 return -ENODEV; 1044 } 1045 1046 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 1047 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1048 !ceph_con_opened(&osd->o_con)) { 1049 struct ceph_osd_request *req; 1050 1051 dout(" osd addr hasn't changed and connection never opened," 1052 " letting msgr retry"); 1053 /* touch each r_stamp for handle_timeout()'s benfit */ 1054 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1055 req->r_stamp = jiffies; 1056 1057 return -EAGAIN; 1058 } 1059 1060 ceph_con_close(&osd->o_con); 1061 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1062 osd->o_incarnation++; 1063 1064 return 0; 1065 } 1066 1067 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 1068 { 1069 struct rb_node **p = &osdc->osds.rb_node; 1070 struct rb_node *parent = NULL; 1071 struct ceph_osd *osd = NULL; 1072 1073 dout("__insert_osd %p osd%d\n", new, new->o_osd); 1074 while (*p) { 1075 parent = *p; 1076 osd = rb_entry(parent, struct ceph_osd, o_node); 1077 if (new->o_osd < osd->o_osd) 1078 p = &(*p)->rb_left; 1079 else if (new->o_osd > osd->o_osd) 1080 p = &(*p)->rb_right; 1081 else 1082 BUG(); 1083 } 1084 1085 rb_link_node(&new->o_node, parent, p); 1086 rb_insert_color(&new->o_node, &osdc->osds); 1087 } 1088 1089 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 1090 { 1091 struct ceph_osd *osd; 1092 struct rb_node *n = osdc->osds.rb_node; 1093 1094 while (n) { 1095 osd = rb_entry(n, struct ceph_osd, o_node); 1096 if (o < osd->o_osd) 1097 n = n->rb_left; 1098 else if (o > osd->o_osd) 1099 n = n->rb_right; 1100 else 1101 return osd; 1102 } 1103 return NULL; 1104 } 1105 1106 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 1107 { 1108 schedule_delayed_work(&osdc->timeout_work, 1109 osdc->client->options->osd_keepalive_timeout * HZ); 1110 } 1111 1112 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1113 { 1114 cancel_delayed_work(&osdc->timeout_work); 1115 } 1116 1117 /* 1118 * Register request, assign tid. If this is the first request, set up 1119 * the timeout event. 1120 */ 1121 static void __register_request(struct ceph_osd_client *osdc, 1122 struct ceph_osd_request *req) 1123 { 1124 req->r_tid = ++osdc->last_tid; 1125 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1126 dout("__register_request %p tid %lld\n", req, req->r_tid); 1127 __insert_request(osdc, req); 1128 ceph_osdc_get_request(req); 1129 osdc->num_requests++; 1130 if (osdc->num_requests == 1) { 1131 dout(" first request, scheduling timeout\n"); 1132 __schedule_osd_timeout(osdc); 1133 } 1134 } 1135 1136 /* 1137 * called under osdc->request_mutex 1138 */ 1139 static void __unregister_request(struct ceph_osd_client *osdc, 1140 struct ceph_osd_request *req) 1141 { 1142 if (RB_EMPTY_NODE(&req->r_node)) { 1143 dout("__unregister_request %p tid %lld not registered\n", 1144 req, req->r_tid); 1145 return; 1146 } 1147 1148 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1149 rb_erase(&req->r_node, &osdc->requests); 1150 osdc->num_requests--; 1151 1152 if (req->r_osd) { 1153 /* make sure the original request isn't in flight. */ 1154 ceph_msg_revoke(req->r_request); 1155 1156 list_del_init(&req->r_osd_item); 1157 if (list_empty(&req->r_osd->o_requests) && 1158 list_empty(&req->r_osd->o_linger_requests)) { 1159 dout("moving osd to %p lru\n", req->r_osd); 1160 __move_osd_to_lru(osdc, req->r_osd); 1161 } 1162 if (list_empty(&req->r_linger_item)) 1163 req->r_osd = NULL; 1164 } 1165 1166 list_del_init(&req->r_req_lru_item); 1167 ceph_osdc_put_request(req); 1168 1169 if (osdc->num_requests == 0) { 1170 dout(" no requests, canceling timeout\n"); 1171 __cancel_osd_timeout(osdc); 1172 } 1173 } 1174 1175 /* 1176 * Cancel a previously queued request message 1177 */ 1178 static void __cancel_request(struct ceph_osd_request *req) 1179 { 1180 if (req->r_sent && req->r_osd) { 1181 ceph_msg_revoke(req->r_request); 1182 req->r_sent = 0; 1183 } 1184 } 1185 1186 static void __register_linger_request(struct ceph_osd_client *osdc, 1187 struct ceph_osd_request *req) 1188 { 1189 dout("__register_linger_request %p\n", req); 1190 ceph_osdc_get_request(req); 1191 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1192 if (req->r_osd) 1193 list_add_tail(&req->r_linger_osd, 1194 &req->r_osd->o_linger_requests); 1195 } 1196 1197 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1198 struct ceph_osd_request *req) 1199 { 1200 dout("__unregister_linger_request %p\n", req); 1201 list_del_init(&req->r_linger_item); 1202 if (req->r_osd) { 1203 list_del_init(&req->r_linger_osd); 1204 1205 if (list_empty(&req->r_osd->o_requests) && 1206 list_empty(&req->r_osd->o_linger_requests)) { 1207 dout("moving osd to %p lru\n", req->r_osd); 1208 __move_osd_to_lru(osdc, req->r_osd); 1209 } 1210 if (list_empty(&req->r_osd_item)) 1211 req->r_osd = NULL; 1212 } 1213 ceph_osdc_put_request(req); 1214 } 1215 1216 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1217 struct ceph_osd_request *req) 1218 { 1219 mutex_lock(&osdc->request_mutex); 1220 if (req->r_linger) { 1221 req->r_linger = 0; 1222 __unregister_linger_request(osdc, req); 1223 } 1224 mutex_unlock(&osdc->request_mutex); 1225 } 1226 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 1227 1228 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1229 struct ceph_osd_request *req) 1230 { 1231 if (!req->r_linger) { 1232 dout("set_request_linger %p\n", req); 1233 req->r_linger = 1; 1234 } 1235 } 1236 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1237 1238 /* 1239 * Returns whether a request should be blocked from being sent 1240 * based on the current osdmap and osd_client settings. 1241 * 1242 * Caller should hold map_sem for read. 1243 */ 1244 static bool __req_should_be_paused(struct ceph_osd_client *osdc, 1245 struct ceph_osd_request *req) 1246 { 1247 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); 1248 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || 1249 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 1250 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || 1251 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); 1252 } 1253 1254 /* 1255 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1256 * (as needed), and set the request r_osd appropriately. If there is 1257 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1258 * (unsent, homeless) or leave on in-flight lru. 1259 * 1260 * Return 0 if unchanged, 1 if changed, or negative on error. 1261 * 1262 * Caller should hold map_sem for read and request_mutex. 1263 */ 1264 static int __map_request(struct ceph_osd_client *osdc, 1265 struct ceph_osd_request *req, int force_resend) 1266 { 1267 struct ceph_pg pgid; 1268 int acting[CEPH_PG_MAX_SIZE]; 1269 int o = -1, num = 0; 1270 int err; 1271 bool was_paused; 1272 1273 dout("map_request %p tid %lld\n", req, req->r_tid); 1274 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1275 ceph_file_layout_pg_pool(req->r_file_layout)); 1276 if (err) { 1277 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1278 return err; 1279 } 1280 req->r_pgid = pgid; 1281 1282 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1283 if (err > 0) { 1284 o = acting[0]; 1285 num = err; 1286 } 1287 1288 was_paused = req->r_paused; 1289 req->r_paused = __req_should_be_paused(osdc, req); 1290 if (was_paused && !req->r_paused) 1291 force_resend = 1; 1292 1293 if ((!force_resend && 1294 req->r_osd && req->r_osd->o_osd == o && 1295 req->r_sent >= req->r_osd->o_incarnation && 1296 req->r_num_pg_osds == num && 1297 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1298 (req->r_osd == NULL && o == -1) || 1299 req->r_paused) 1300 return 0; /* no change */ 1301 1302 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1303 req->r_tid, pgid.pool, pgid.seed, o, 1304 req->r_osd ? req->r_osd->o_osd : -1); 1305 1306 /* record full pg acting set */ 1307 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1308 req->r_num_pg_osds = num; 1309 1310 if (req->r_osd) { 1311 __cancel_request(req); 1312 list_del_init(&req->r_osd_item); 1313 req->r_osd = NULL; 1314 } 1315 1316 req->r_osd = __lookup_osd(osdc, o); 1317 if (!req->r_osd && o >= 0) { 1318 err = -ENOMEM; 1319 req->r_osd = create_osd(osdc, o); 1320 if (!req->r_osd) { 1321 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1322 goto out; 1323 } 1324 1325 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1326 __insert_osd(osdc, req->r_osd); 1327 1328 ceph_con_open(&req->r_osd->o_con, 1329 CEPH_ENTITY_TYPE_OSD, o, 1330 &osdc->osdmap->osd_addr[o]); 1331 } 1332 1333 if (req->r_osd) { 1334 __remove_osd_from_lru(req->r_osd); 1335 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1336 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1337 } else { 1338 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1339 } 1340 err = 1; /* osd or pg changed */ 1341 1342 out: 1343 return err; 1344 } 1345 1346 /* 1347 * caller should hold map_sem (for read) and request_mutex 1348 */ 1349 static void __send_request(struct ceph_osd_client *osdc, 1350 struct ceph_osd_request *req) 1351 { 1352 void *p; 1353 1354 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1355 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1356 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1357 1358 /* fill in message content that changes each time we send it */ 1359 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1360 put_unaligned_le32(req->r_flags, req->r_request_flags); 1361 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1362 p = req->r_request_pgid; 1363 ceph_encode_64(&p, req->r_pgid.pool); 1364 ceph_encode_32(&p, req->r_pgid.seed); 1365 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1366 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1367 sizeof(req->r_reassert_version)); 1368 1369 req->r_stamp = jiffies; 1370 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1371 1372 ceph_msg_get(req->r_request); /* send consumes a ref */ 1373 1374 req->r_sent = req->r_osd->o_incarnation; 1375 1376 ceph_con_send(&req->r_osd->o_con, req->r_request); 1377 } 1378 1379 /* 1380 * Send any requests in the queue (req_unsent). 1381 */ 1382 static void __send_queued(struct ceph_osd_client *osdc) 1383 { 1384 struct ceph_osd_request *req, *tmp; 1385 1386 dout("__send_queued\n"); 1387 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1388 __send_request(osdc, req); 1389 } 1390 1391 /* 1392 * Timeout callback, called every N seconds when 1 or more osd 1393 * requests has been active for more than N seconds. When this 1394 * happens, we ping all OSDs with requests who have timed out to 1395 * ensure any communications channel reset is detected. Reset the 1396 * request timeouts another N seconds in the future as we go. 1397 * Reschedule the timeout event another N seconds in future (unless 1398 * there are no open requests). 1399 */ 1400 static void handle_timeout(struct work_struct *work) 1401 { 1402 struct ceph_osd_client *osdc = 1403 container_of(work, struct ceph_osd_client, timeout_work.work); 1404 struct ceph_osd_request *req; 1405 struct ceph_osd *osd; 1406 unsigned long keepalive = 1407 osdc->client->options->osd_keepalive_timeout * HZ; 1408 struct list_head slow_osds; 1409 dout("timeout\n"); 1410 down_read(&osdc->map_sem); 1411 1412 ceph_monc_request_next_osdmap(&osdc->client->monc); 1413 1414 mutex_lock(&osdc->request_mutex); 1415 1416 /* 1417 * ping osds that are a bit slow. this ensures that if there 1418 * is a break in the TCP connection we will notice, and reopen 1419 * a connection with that osd (from the fault callback). 1420 */ 1421 INIT_LIST_HEAD(&slow_osds); 1422 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1423 if (time_before(jiffies, req->r_stamp + keepalive)) 1424 break; 1425 1426 osd = req->r_osd; 1427 BUG_ON(!osd); 1428 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1429 req->r_tid, osd->o_osd); 1430 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1431 } 1432 while (!list_empty(&slow_osds)) { 1433 osd = list_entry(slow_osds.next, struct ceph_osd, 1434 o_keepalive_item); 1435 list_del_init(&osd->o_keepalive_item); 1436 ceph_con_keepalive(&osd->o_con); 1437 } 1438 1439 __schedule_osd_timeout(osdc); 1440 __send_queued(osdc); 1441 mutex_unlock(&osdc->request_mutex); 1442 up_read(&osdc->map_sem); 1443 } 1444 1445 static void handle_osds_timeout(struct work_struct *work) 1446 { 1447 struct ceph_osd_client *osdc = 1448 container_of(work, struct ceph_osd_client, 1449 osds_timeout_work.work); 1450 unsigned long delay = 1451 osdc->client->options->osd_idle_ttl * HZ >> 2; 1452 1453 dout("osds timeout\n"); 1454 down_read(&osdc->map_sem); 1455 remove_old_osds(osdc); 1456 up_read(&osdc->map_sem); 1457 1458 schedule_delayed_work(&osdc->osds_timeout_work, 1459 round_jiffies_relative(delay)); 1460 } 1461 1462 static void complete_request(struct ceph_osd_request *req) 1463 { 1464 complete_all(&req->r_safe_completion); /* fsync waiter */ 1465 } 1466 1467 /* 1468 * handle osd op reply. either call the callback if it is specified, 1469 * or do the completion to wake up the waiting thread. 1470 */ 1471 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1472 struct ceph_connection *con) 1473 { 1474 void *p, *end; 1475 struct ceph_osd_request *req; 1476 u64 tid; 1477 int object_len; 1478 unsigned int numops; 1479 int payload_len, flags; 1480 s32 result; 1481 s32 retry_attempt; 1482 struct ceph_pg pg; 1483 int err; 1484 u32 reassert_epoch; 1485 u64 reassert_version; 1486 u32 osdmap_epoch; 1487 int already_completed; 1488 u32 bytes; 1489 unsigned int i; 1490 1491 tid = le64_to_cpu(msg->hdr.tid); 1492 dout("handle_reply %p tid %llu\n", msg, tid); 1493 1494 p = msg->front.iov_base; 1495 end = p + msg->front.iov_len; 1496 1497 ceph_decode_need(&p, end, 4, bad); 1498 object_len = ceph_decode_32(&p); 1499 ceph_decode_need(&p, end, object_len, bad); 1500 p += object_len; 1501 1502 err = ceph_decode_pgid(&p, end, &pg); 1503 if (err) 1504 goto bad; 1505 1506 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1507 flags = ceph_decode_64(&p); 1508 result = ceph_decode_32(&p); 1509 reassert_epoch = ceph_decode_32(&p); 1510 reassert_version = ceph_decode_64(&p); 1511 osdmap_epoch = ceph_decode_32(&p); 1512 1513 /* lookup */ 1514 mutex_lock(&osdc->request_mutex); 1515 req = __lookup_request(osdc, tid); 1516 if (req == NULL) { 1517 dout("handle_reply tid %llu dne\n", tid); 1518 goto bad_mutex; 1519 } 1520 ceph_osdc_get_request(req); 1521 1522 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1523 req, result); 1524 1525 ceph_decode_need(&p, end, 4, bad_put); 1526 numops = ceph_decode_32(&p); 1527 if (numops > CEPH_OSD_MAX_OP) 1528 goto bad_put; 1529 if (numops != req->r_num_ops) 1530 goto bad_put; 1531 payload_len = 0; 1532 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); 1533 for (i = 0; i < numops; i++) { 1534 struct ceph_osd_op *op = p; 1535 int len; 1536 1537 len = le32_to_cpu(op->payload_len); 1538 req->r_reply_op_len[i] = len; 1539 dout(" op %d has %d bytes\n", i, len); 1540 payload_len += len; 1541 p += sizeof(*op); 1542 } 1543 bytes = le32_to_cpu(msg->hdr.data_len); 1544 if (payload_len != bytes) { 1545 pr_warning("sum of op payload lens %d != data_len %d", 1546 payload_len, bytes); 1547 goto bad_put; 1548 } 1549 1550 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1551 retry_attempt = ceph_decode_32(&p); 1552 for (i = 0; i < numops; i++) 1553 req->r_reply_op_result[i] = ceph_decode_32(&p); 1554 1555 already_completed = req->r_got_reply; 1556 1557 if (!req->r_got_reply) { 1558 1559 req->r_result = result; 1560 dout("handle_reply result %d bytes %d\n", req->r_result, 1561 bytes); 1562 if (req->r_result == 0) 1563 req->r_result = bytes; 1564 1565 /* in case this is a write and we need to replay, */ 1566 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1567 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1568 1569 req->r_got_reply = 1; 1570 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1571 dout("handle_reply tid %llu dup ack\n", tid); 1572 mutex_unlock(&osdc->request_mutex); 1573 goto done; 1574 } 1575 1576 dout("handle_reply tid %llu flags %d\n", tid, flags); 1577 1578 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1579 __register_linger_request(osdc, req); 1580 1581 /* either this is a read, or we got the safe response */ 1582 if (result < 0 || 1583 (flags & CEPH_OSD_FLAG_ONDISK) || 1584 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1585 __unregister_request(osdc, req); 1586 1587 mutex_unlock(&osdc->request_mutex); 1588 1589 if (!already_completed) { 1590 if (req->r_unsafe_callback && 1591 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 1592 req->r_unsafe_callback(req, true); 1593 if (req->r_callback) 1594 req->r_callback(req, msg); 1595 else 1596 complete_all(&req->r_completion); 1597 } 1598 1599 if (flags & CEPH_OSD_FLAG_ONDISK) { 1600 if (req->r_unsafe_callback && already_completed) 1601 req->r_unsafe_callback(req, false); 1602 complete_request(req); 1603 } 1604 1605 done: 1606 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1607 ceph_osdc_put_request(req); 1608 return; 1609 1610 bad_put: 1611 ceph_osdc_put_request(req); 1612 bad_mutex: 1613 mutex_unlock(&osdc->request_mutex); 1614 bad: 1615 pr_err("corrupt osd_op_reply got %d %d\n", 1616 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1617 ceph_msg_dump(msg); 1618 } 1619 1620 static void reset_changed_osds(struct ceph_osd_client *osdc) 1621 { 1622 struct rb_node *p, *n; 1623 1624 dout("%s %p\n", __func__, osdc); 1625 for (p = rb_first(&osdc->osds); p; p = n) { 1626 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1627 1628 n = rb_next(p); 1629 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1630 memcmp(&osd->o_con.peer_addr, 1631 ceph_osd_addr(osdc->osdmap, 1632 osd->o_osd), 1633 sizeof(struct ceph_entity_addr)) != 0) 1634 __reset_osd(osdc, osd); 1635 } 1636 } 1637 1638 /* 1639 * Requeue requests whose mapping to an OSD has changed. If requests map to 1640 * no osd, request a new map. 1641 * 1642 * Caller should hold map_sem for read. 1643 */ 1644 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, 1645 bool force_resend_writes) 1646 { 1647 struct ceph_osd_request *req, *nreq; 1648 struct rb_node *p; 1649 int needmap = 0; 1650 int err; 1651 bool force_resend_req; 1652 1653 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", 1654 force_resend_writes ? " (force resend writes)" : ""); 1655 mutex_lock(&osdc->request_mutex); 1656 for (p = rb_first(&osdc->requests); p; ) { 1657 req = rb_entry(p, struct ceph_osd_request, r_node); 1658 p = rb_next(p); 1659 1660 /* 1661 * For linger requests that have not yet been 1662 * registered, move them to the linger list; they'll 1663 * be sent to the osd in the loop below. Unregister 1664 * the request before re-registering it as a linger 1665 * request to ensure the __map_request() below 1666 * will decide it needs to be sent. 1667 */ 1668 if (req->r_linger && list_empty(&req->r_linger_item)) { 1669 dout("%p tid %llu restart on osd%d\n", 1670 req, req->r_tid, 1671 req->r_osd ? req->r_osd->o_osd : -1); 1672 ceph_osdc_get_request(req); 1673 __unregister_request(osdc, req); 1674 __register_linger_request(osdc, req); 1675 ceph_osdc_put_request(req); 1676 continue; 1677 } 1678 1679 force_resend_req = force_resend || 1680 (force_resend_writes && 1681 req->r_flags & CEPH_OSD_FLAG_WRITE); 1682 err = __map_request(osdc, req, force_resend_req); 1683 if (err < 0) 1684 continue; /* error */ 1685 if (req->r_osd == NULL) { 1686 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1687 needmap++; /* request a newer map */ 1688 } else if (err > 0) { 1689 if (!req->r_linger) { 1690 dout("%p tid %llu requeued on osd%d\n", req, 1691 req->r_tid, 1692 req->r_osd ? req->r_osd->o_osd : -1); 1693 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1694 } 1695 } 1696 } 1697 1698 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1699 r_linger_item) { 1700 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1701 1702 err = __map_request(osdc, req, 1703 force_resend || force_resend_writes); 1704 dout("__map_request returned %d\n", err); 1705 if (err < 0) 1706 continue; /* hrm! */ 1707 if (req->r_osd == NULL || err > 0) { 1708 if (req->r_osd == NULL) { 1709 dout("lingering %p tid %llu maps to no osd\n", 1710 req, req->r_tid); 1711 /* 1712 * A homeless lingering request makes 1713 * no sense, as it's job is to keep 1714 * a particular OSD connection open. 1715 * Request a newer map and kick the 1716 * request, knowing that it won't be 1717 * resent until we actually get a map 1718 * that can tell us where to send it. 1719 */ 1720 needmap++; 1721 } 1722 1723 dout("kicking lingering %p tid %llu osd%d\n", req, 1724 req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); 1725 __register_request(osdc, req); 1726 __unregister_linger_request(osdc, req); 1727 } 1728 } 1729 reset_changed_osds(osdc); 1730 mutex_unlock(&osdc->request_mutex); 1731 1732 if (needmap) { 1733 dout("%d requests for down osds, need new map\n", needmap); 1734 ceph_monc_request_next_osdmap(&osdc->client->monc); 1735 } 1736 } 1737 1738 1739 /* 1740 * Process updated osd map. 1741 * 1742 * The message contains any number of incremental and full maps, normally 1743 * indicating some sort of topology change in the cluster. Kick requests 1744 * off to different OSDs as needed. 1745 */ 1746 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1747 { 1748 void *p, *end, *next; 1749 u32 nr_maps, maplen; 1750 u32 epoch; 1751 struct ceph_osdmap *newmap = NULL, *oldmap; 1752 int err; 1753 struct ceph_fsid fsid; 1754 bool was_full; 1755 1756 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1757 p = msg->front.iov_base; 1758 end = p + msg->front.iov_len; 1759 1760 /* verify fsid */ 1761 ceph_decode_need(&p, end, sizeof(fsid), bad); 1762 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1763 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1764 return; 1765 1766 down_write(&osdc->map_sem); 1767 1768 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 1769 1770 /* incremental maps */ 1771 ceph_decode_32_safe(&p, end, nr_maps, bad); 1772 dout(" %d inc maps\n", nr_maps); 1773 while (nr_maps > 0) { 1774 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1775 epoch = ceph_decode_32(&p); 1776 maplen = ceph_decode_32(&p); 1777 ceph_decode_need(&p, end, maplen, bad); 1778 next = p + maplen; 1779 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1780 dout("applying incremental map %u len %d\n", 1781 epoch, maplen); 1782 newmap = osdmap_apply_incremental(&p, next, 1783 osdc->osdmap, 1784 &osdc->client->msgr); 1785 if (IS_ERR(newmap)) { 1786 err = PTR_ERR(newmap); 1787 goto bad; 1788 } 1789 BUG_ON(!newmap); 1790 if (newmap != osdc->osdmap) { 1791 ceph_osdmap_destroy(osdc->osdmap); 1792 osdc->osdmap = newmap; 1793 } 1794 was_full = was_full || 1795 ceph_osdmap_flag(osdc->osdmap, 1796 CEPH_OSDMAP_FULL); 1797 kick_requests(osdc, 0, was_full); 1798 } else { 1799 dout("ignoring incremental map %u len %d\n", 1800 epoch, maplen); 1801 } 1802 p = next; 1803 nr_maps--; 1804 } 1805 if (newmap) 1806 goto done; 1807 1808 /* full maps */ 1809 ceph_decode_32_safe(&p, end, nr_maps, bad); 1810 dout(" %d full maps\n", nr_maps); 1811 while (nr_maps) { 1812 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1813 epoch = ceph_decode_32(&p); 1814 maplen = ceph_decode_32(&p); 1815 ceph_decode_need(&p, end, maplen, bad); 1816 if (nr_maps > 1) { 1817 dout("skipping non-latest full map %u len %d\n", 1818 epoch, maplen); 1819 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1820 dout("skipping full map %u len %d, " 1821 "older than our %u\n", epoch, maplen, 1822 osdc->osdmap->epoch); 1823 } else { 1824 int skipped_map = 0; 1825 1826 dout("taking full map %u len %d\n", epoch, maplen); 1827 newmap = osdmap_decode(&p, p+maplen); 1828 if (IS_ERR(newmap)) { 1829 err = PTR_ERR(newmap); 1830 goto bad; 1831 } 1832 BUG_ON(!newmap); 1833 oldmap = osdc->osdmap; 1834 osdc->osdmap = newmap; 1835 if (oldmap) { 1836 if (oldmap->epoch + 1 < newmap->epoch) 1837 skipped_map = 1; 1838 ceph_osdmap_destroy(oldmap); 1839 } 1840 was_full = was_full || 1841 ceph_osdmap_flag(osdc->osdmap, 1842 CEPH_OSDMAP_FULL); 1843 kick_requests(osdc, skipped_map, was_full); 1844 } 1845 p += maplen; 1846 nr_maps--; 1847 } 1848 1849 if (!osdc->osdmap) 1850 goto bad; 1851 done: 1852 downgrade_write(&osdc->map_sem); 1853 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1854 1855 /* 1856 * subscribe to subsequent osdmap updates if full to ensure 1857 * we find out when we are no longer full and stop returning 1858 * ENOSPC. 1859 */ 1860 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 1861 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || 1862 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) 1863 ceph_monc_request_next_osdmap(&osdc->client->monc); 1864 1865 mutex_lock(&osdc->request_mutex); 1866 __send_queued(osdc); 1867 mutex_unlock(&osdc->request_mutex); 1868 up_read(&osdc->map_sem); 1869 wake_up_all(&osdc->client->auth_wq); 1870 return; 1871 1872 bad: 1873 pr_err("osdc handle_map corrupt msg\n"); 1874 ceph_msg_dump(msg); 1875 up_write(&osdc->map_sem); 1876 return; 1877 } 1878 1879 /* 1880 * watch/notify callback event infrastructure 1881 * 1882 * These callbacks are used both for watch and notify operations. 1883 */ 1884 static void __release_event(struct kref *kref) 1885 { 1886 struct ceph_osd_event *event = 1887 container_of(kref, struct ceph_osd_event, kref); 1888 1889 dout("__release_event %p\n", event); 1890 kfree(event); 1891 } 1892 1893 static void get_event(struct ceph_osd_event *event) 1894 { 1895 kref_get(&event->kref); 1896 } 1897 1898 void ceph_osdc_put_event(struct ceph_osd_event *event) 1899 { 1900 kref_put(&event->kref, __release_event); 1901 } 1902 EXPORT_SYMBOL(ceph_osdc_put_event); 1903 1904 static void __insert_event(struct ceph_osd_client *osdc, 1905 struct ceph_osd_event *new) 1906 { 1907 struct rb_node **p = &osdc->event_tree.rb_node; 1908 struct rb_node *parent = NULL; 1909 struct ceph_osd_event *event = NULL; 1910 1911 while (*p) { 1912 parent = *p; 1913 event = rb_entry(parent, struct ceph_osd_event, node); 1914 if (new->cookie < event->cookie) 1915 p = &(*p)->rb_left; 1916 else if (new->cookie > event->cookie) 1917 p = &(*p)->rb_right; 1918 else 1919 BUG(); 1920 } 1921 1922 rb_link_node(&new->node, parent, p); 1923 rb_insert_color(&new->node, &osdc->event_tree); 1924 } 1925 1926 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1927 u64 cookie) 1928 { 1929 struct rb_node **p = &osdc->event_tree.rb_node; 1930 struct rb_node *parent = NULL; 1931 struct ceph_osd_event *event = NULL; 1932 1933 while (*p) { 1934 parent = *p; 1935 event = rb_entry(parent, struct ceph_osd_event, node); 1936 if (cookie < event->cookie) 1937 p = &(*p)->rb_left; 1938 else if (cookie > event->cookie) 1939 p = &(*p)->rb_right; 1940 else 1941 return event; 1942 } 1943 return NULL; 1944 } 1945 1946 static void __remove_event(struct ceph_osd_event *event) 1947 { 1948 struct ceph_osd_client *osdc = event->osdc; 1949 1950 if (!RB_EMPTY_NODE(&event->node)) { 1951 dout("__remove_event removed %p\n", event); 1952 rb_erase(&event->node, &osdc->event_tree); 1953 ceph_osdc_put_event(event); 1954 } else { 1955 dout("__remove_event didn't remove %p\n", event); 1956 } 1957 } 1958 1959 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1960 void (*event_cb)(u64, u64, u8, void *), 1961 void *data, struct ceph_osd_event **pevent) 1962 { 1963 struct ceph_osd_event *event; 1964 1965 event = kmalloc(sizeof(*event), GFP_NOIO); 1966 if (!event) 1967 return -ENOMEM; 1968 1969 dout("create_event %p\n", event); 1970 event->cb = event_cb; 1971 event->one_shot = 0; 1972 event->data = data; 1973 event->osdc = osdc; 1974 INIT_LIST_HEAD(&event->osd_node); 1975 RB_CLEAR_NODE(&event->node); 1976 kref_init(&event->kref); /* one ref for us */ 1977 kref_get(&event->kref); /* one ref for the caller */ 1978 1979 spin_lock(&osdc->event_lock); 1980 event->cookie = ++osdc->event_count; 1981 __insert_event(osdc, event); 1982 spin_unlock(&osdc->event_lock); 1983 1984 *pevent = event; 1985 return 0; 1986 } 1987 EXPORT_SYMBOL(ceph_osdc_create_event); 1988 1989 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1990 { 1991 struct ceph_osd_client *osdc = event->osdc; 1992 1993 dout("cancel_event %p\n", event); 1994 spin_lock(&osdc->event_lock); 1995 __remove_event(event); 1996 spin_unlock(&osdc->event_lock); 1997 ceph_osdc_put_event(event); /* caller's */ 1998 } 1999 EXPORT_SYMBOL(ceph_osdc_cancel_event); 2000 2001 2002 static void do_event_work(struct work_struct *work) 2003 { 2004 struct ceph_osd_event_work *event_work = 2005 container_of(work, struct ceph_osd_event_work, work); 2006 struct ceph_osd_event *event = event_work->event; 2007 u64 ver = event_work->ver; 2008 u64 notify_id = event_work->notify_id; 2009 u8 opcode = event_work->opcode; 2010 2011 dout("do_event_work completing %p\n", event); 2012 event->cb(ver, notify_id, opcode, event->data); 2013 dout("do_event_work completed %p\n", event); 2014 ceph_osdc_put_event(event); 2015 kfree(event_work); 2016 } 2017 2018 2019 /* 2020 * Process osd watch notifications 2021 */ 2022 static void handle_watch_notify(struct ceph_osd_client *osdc, 2023 struct ceph_msg *msg) 2024 { 2025 void *p, *end; 2026 u8 proto_ver; 2027 u64 cookie, ver, notify_id; 2028 u8 opcode; 2029 struct ceph_osd_event *event; 2030 struct ceph_osd_event_work *event_work; 2031 2032 p = msg->front.iov_base; 2033 end = p + msg->front.iov_len; 2034 2035 ceph_decode_8_safe(&p, end, proto_ver, bad); 2036 ceph_decode_8_safe(&p, end, opcode, bad); 2037 ceph_decode_64_safe(&p, end, cookie, bad); 2038 ceph_decode_64_safe(&p, end, ver, bad); 2039 ceph_decode_64_safe(&p, end, notify_id, bad); 2040 2041 spin_lock(&osdc->event_lock); 2042 event = __find_event(osdc, cookie); 2043 if (event) { 2044 BUG_ON(event->one_shot); 2045 get_event(event); 2046 } 2047 spin_unlock(&osdc->event_lock); 2048 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 2049 cookie, ver, event); 2050 if (event) { 2051 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2052 if (!event_work) { 2053 dout("ERROR: could not allocate event_work\n"); 2054 goto done_err; 2055 } 2056 INIT_WORK(&event_work->work, do_event_work); 2057 event_work->event = event; 2058 event_work->ver = ver; 2059 event_work->notify_id = notify_id; 2060 event_work->opcode = opcode; 2061 if (!queue_work(osdc->notify_wq, &event_work->work)) { 2062 dout("WARNING: failed to queue notify event work\n"); 2063 goto done_err; 2064 } 2065 } 2066 2067 return; 2068 2069 done_err: 2070 ceph_osdc_put_event(event); 2071 return; 2072 2073 bad: 2074 pr_err("osdc handle_watch_notify corrupt msg\n"); 2075 return; 2076 } 2077 2078 /* 2079 * build new request AND message 2080 * 2081 */ 2082 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 2083 struct ceph_snap_context *snapc, u64 snap_id, 2084 struct timespec *mtime) 2085 { 2086 struct ceph_msg *msg = req->r_request; 2087 void *p; 2088 size_t msg_size; 2089 int flags = req->r_flags; 2090 u64 data_len; 2091 unsigned int i; 2092 2093 req->r_snapid = snap_id; 2094 req->r_snapc = ceph_get_snap_context(snapc); 2095 2096 /* encode request */ 2097 msg->hdr.version = cpu_to_le16(4); 2098 2099 p = msg->front.iov_base; 2100 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 2101 req->r_request_osdmap_epoch = p; 2102 p += 4; 2103 req->r_request_flags = p; 2104 p += 4; 2105 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 2106 ceph_encode_timespec(p, mtime); 2107 p += sizeof(struct ceph_timespec); 2108 req->r_request_reassert_version = p; 2109 p += sizeof(struct ceph_eversion); /* will get filled in */ 2110 2111 /* oloc */ 2112 ceph_encode_8(&p, 4); 2113 ceph_encode_8(&p, 4); 2114 ceph_encode_32(&p, 8 + 4 + 4); 2115 req->r_request_pool = p; 2116 p += 8; 2117 ceph_encode_32(&p, -1); /* preferred */ 2118 ceph_encode_32(&p, 0); /* key len */ 2119 2120 ceph_encode_8(&p, 1); 2121 req->r_request_pgid = p; 2122 p += 8 + 4; 2123 ceph_encode_32(&p, -1); /* preferred */ 2124 2125 /* oid */ 2126 ceph_encode_32(&p, req->r_oid_len); 2127 memcpy(p, req->r_oid, req->r_oid_len); 2128 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 2129 p += req->r_oid_len; 2130 2131 /* ops--can imply data */ 2132 ceph_encode_16(&p, (u16)req->r_num_ops); 2133 data_len = 0; 2134 for (i = 0; i < req->r_num_ops; i++) { 2135 data_len += osd_req_encode_op(req, p, i); 2136 p += sizeof(struct ceph_osd_op); 2137 } 2138 2139 /* snaps */ 2140 ceph_encode_64(&p, req->r_snapid); 2141 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 2142 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 2143 if (req->r_snapc) { 2144 for (i = 0; i < snapc->num_snaps; i++) { 2145 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2146 } 2147 } 2148 2149 req->r_request_attempts = p; 2150 p += 4; 2151 2152 /* data */ 2153 if (flags & CEPH_OSD_FLAG_WRITE) { 2154 u16 data_off; 2155 2156 /* 2157 * The header "data_off" is a hint to the receiver 2158 * allowing it to align received data into its 2159 * buffers such that there's no need to re-copy 2160 * it before writing it to disk (direct I/O). 2161 */ 2162 data_off = (u16) (off & 0xffff); 2163 req->r_request->hdr.data_off = cpu_to_le16(data_off); 2164 } 2165 req->r_request->hdr.data_len = cpu_to_le32(data_len); 2166 2167 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 2168 msg_size = p - msg->front.iov_base; 2169 msg->front.iov_len = msg_size; 2170 msg->hdr.front_len = cpu_to_le32(msg_size); 2171 2172 dout("build_request msg_size was %d\n", (int)msg_size); 2173 } 2174 EXPORT_SYMBOL(ceph_osdc_build_request); 2175 2176 /* 2177 * Register request, send initial attempt. 2178 */ 2179 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2180 struct ceph_osd_request *req, 2181 bool nofail) 2182 { 2183 int rc = 0; 2184 2185 down_read(&osdc->map_sem); 2186 mutex_lock(&osdc->request_mutex); 2187 __register_request(osdc, req); 2188 req->r_sent = 0; 2189 req->r_got_reply = 0; 2190 rc = __map_request(osdc, req, 0); 2191 if (rc < 0) { 2192 if (nofail) { 2193 dout("osdc_start_request failed map, " 2194 " will retry %lld\n", req->r_tid); 2195 rc = 0; 2196 } else { 2197 __unregister_request(osdc, req); 2198 } 2199 goto out_unlock; 2200 } 2201 if (req->r_osd == NULL) { 2202 dout("send_request %p no up osds in pg\n", req); 2203 ceph_monc_request_next_osdmap(&osdc->client->monc); 2204 } else { 2205 __send_queued(osdc); 2206 } 2207 rc = 0; 2208 out_unlock: 2209 mutex_unlock(&osdc->request_mutex); 2210 up_read(&osdc->map_sem); 2211 return rc; 2212 } 2213 EXPORT_SYMBOL(ceph_osdc_start_request); 2214 2215 /* 2216 * wait for a request to complete 2217 */ 2218 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2219 struct ceph_osd_request *req) 2220 { 2221 int rc; 2222 2223 rc = wait_for_completion_interruptible(&req->r_completion); 2224 if (rc < 0) { 2225 mutex_lock(&osdc->request_mutex); 2226 __cancel_request(req); 2227 __unregister_request(osdc, req); 2228 mutex_unlock(&osdc->request_mutex); 2229 complete_request(req); 2230 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 2231 return rc; 2232 } 2233 2234 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 2235 return req->r_result; 2236 } 2237 EXPORT_SYMBOL(ceph_osdc_wait_request); 2238 2239 /* 2240 * sync - wait for all in-flight requests to flush. avoid starvation. 2241 */ 2242 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2243 { 2244 struct ceph_osd_request *req; 2245 u64 last_tid, next_tid = 0; 2246 2247 mutex_lock(&osdc->request_mutex); 2248 last_tid = osdc->last_tid; 2249 while (1) { 2250 req = __lookup_request_ge(osdc, next_tid); 2251 if (!req) 2252 break; 2253 if (req->r_tid > last_tid) 2254 break; 2255 2256 next_tid = req->r_tid + 1; 2257 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2258 continue; 2259 2260 ceph_osdc_get_request(req); 2261 mutex_unlock(&osdc->request_mutex); 2262 dout("sync waiting on tid %llu (last is %llu)\n", 2263 req->r_tid, last_tid); 2264 wait_for_completion(&req->r_safe_completion); 2265 mutex_lock(&osdc->request_mutex); 2266 ceph_osdc_put_request(req); 2267 } 2268 mutex_unlock(&osdc->request_mutex); 2269 dout("sync done (thru tid %llu)\n", last_tid); 2270 } 2271 EXPORT_SYMBOL(ceph_osdc_sync); 2272 2273 /* 2274 * Call all pending notify callbacks - for use after a watch is 2275 * unregistered, to make sure no more callbacks for it will be invoked 2276 */ 2277 extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 2278 { 2279 flush_workqueue(osdc->notify_wq); 2280 } 2281 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 2282 2283 2284 /* 2285 * init, shutdown 2286 */ 2287 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2288 { 2289 int err; 2290 2291 dout("init\n"); 2292 osdc->client = client; 2293 osdc->osdmap = NULL; 2294 init_rwsem(&osdc->map_sem); 2295 init_completion(&osdc->map_waiters); 2296 osdc->last_requested_map = 0; 2297 mutex_init(&osdc->request_mutex); 2298 osdc->last_tid = 0; 2299 osdc->osds = RB_ROOT; 2300 INIT_LIST_HEAD(&osdc->osd_lru); 2301 osdc->requests = RB_ROOT; 2302 INIT_LIST_HEAD(&osdc->req_lru); 2303 INIT_LIST_HEAD(&osdc->req_unsent); 2304 INIT_LIST_HEAD(&osdc->req_notarget); 2305 INIT_LIST_HEAD(&osdc->req_linger); 2306 osdc->num_requests = 0; 2307 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2308 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2309 spin_lock_init(&osdc->event_lock); 2310 osdc->event_tree = RB_ROOT; 2311 osdc->event_count = 0; 2312 2313 schedule_delayed_work(&osdc->osds_timeout_work, 2314 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 2315 2316 err = -ENOMEM; 2317 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2318 sizeof(struct ceph_osd_request)); 2319 if (!osdc->req_mempool) 2320 goto out; 2321 2322 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2323 OSD_OP_FRONT_LEN, 10, true, 2324 "osd_op"); 2325 if (err < 0) 2326 goto out_mempool; 2327 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2328 OSD_OPREPLY_FRONT_LEN, 10, true, 2329 "osd_op_reply"); 2330 if (err < 0) 2331 goto out_msgpool; 2332 2333 err = -ENOMEM; 2334 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2335 if (!osdc->notify_wq) 2336 goto out_msgpool; 2337 return 0; 2338 2339 out_msgpool: 2340 ceph_msgpool_destroy(&osdc->msgpool_op); 2341 out_mempool: 2342 mempool_destroy(osdc->req_mempool); 2343 out: 2344 return err; 2345 } 2346 2347 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2348 { 2349 flush_workqueue(osdc->notify_wq); 2350 destroy_workqueue(osdc->notify_wq); 2351 cancel_delayed_work_sync(&osdc->timeout_work); 2352 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2353 if (osdc->osdmap) { 2354 ceph_osdmap_destroy(osdc->osdmap); 2355 osdc->osdmap = NULL; 2356 } 2357 remove_all_osds(osdc); 2358 mempool_destroy(osdc->req_mempool); 2359 ceph_msgpool_destroy(&osdc->msgpool_op); 2360 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2361 } 2362 2363 /* 2364 * Read some contiguous pages. If we cross a stripe boundary, shorten 2365 * *plen. Return number of bytes read, or error. 2366 */ 2367 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2368 struct ceph_vino vino, struct ceph_file_layout *layout, 2369 u64 off, u64 *plen, 2370 u32 truncate_seq, u64 truncate_size, 2371 struct page **pages, int num_pages, int page_align) 2372 { 2373 struct ceph_osd_request *req; 2374 int rc = 0; 2375 2376 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2377 vino.snap, off, *plen); 2378 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, 2379 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2380 NULL, truncate_seq, truncate_size, 2381 false); 2382 if (IS_ERR(req)) 2383 return PTR_ERR(req); 2384 2385 /* it may be a short read due to an object boundary */ 2386 2387 osd_req_op_extent_osd_data_pages(req, 0, 2388 pages, *plen, page_align, false, false); 2389 2390 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2391 off, *plen, *plen, page_align); 2392 2393 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2394 2395 rc = ceph_osdc_start_request(osdc, req, false); 2396 if (!rc) 2397 rc = ceph_osdc_wait_request(osdc, req); 2398 2399 ceph_osdc_put_request(req); 2400 dout("readpages result %d\n", rc); 2401 return rc; 2402 } 2403 EXPORT_SYMBOL(ceph_osdc_readpages); 2404 2405 /* 2406 * do a synchronous write on N pages 2407 */ 2408 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2409 struct ceph_file_layout *layout, 2410 struct ceph_snap_context *snapc, 2411 u64 off, u64 len, 2412 u32 truncate_seq, u64 truncate_size, 2413 struct timespec *mtime, 2414 struct page **pages, int num_pages) 2415 { 2416 struct ceph_osd_request *req; 2417 int rc = 0; 2418 int page_align = off & ~PAGE_MASK; 2419 2420 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2421 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, 2422 CEPH_OSD_OP_WRITE, 2423 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2424 snapc, truncate_seq, truncate_size, 2425 true); 2426 if (IS_ERR(req)) 2427 return PTR_ERR(req); 2428 2429 /* it may be a short write due to an object boundary */ 2430 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 2431 false, false); 2432 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2433 2434 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2435 2436 rc = ceph_osdc_start_request(osdc, req, true); 2437 if (!rc) 2438 rc = ceph_osdc_wait_request(osdc, req); 2439 2440 ceph_osdc_put_request(req); 2441 if (rc == 0) 2442 rc = len; 2443 dout("writepages result %d\n", rc); 2444 return rc; 2445 } 2446 EXPORT_SYMBOL(ceph_osdc_writepages); 2447 2448 int ceph_osdc_setup(void) 2449 { 2450 BUG_ON(ceph_osd_request_cache); 2451 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", 2452 sizeof (struct ceph_osd_request), 2453 __alignof__(struct ceph_osd_request), 2454 0, NULL); 2455 2456 return ceph_osd_request_cache ? 0 : -ENOMEM; 2457 } 2458 EXPORT_SYMBOL(ceph_osdc_setup); 2459 2460 void ceph_osdc_cleanup(void) 2461 { 2462 BUG_ON(!ceph_osd_request_cache); 2463 kmem_cache_destroy(ceph_osd_request_cache); 2464 ceph_osd_request_cache = NULL; 2465 } 2466 EXPORT_SYMBOL(ceph_osdc_cleanup); 2467 2468 /* 2469 * handle incoming message 2470 */ 2471 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2472 { 2473 struct ceph_osd *osd = con->private; 2474 struct ceph_osd_client *osdc; 2475 int type = le16_to_cpu(msg->hdr.type); 2476 2477 if (!osd) 2478 goto out; 2479 osdc = osd->o_osdc; 2480 2481 switch (type) { 2482 case CEPH_MSG_OSD_MAP: 2483 ceph_osdc_handle_map(osdc, msg); 2484 break; 2485 case CEPH_MSG_OSD_OPREPLY: 2486 handle_reply(osdc, msg, con); 2487 break; 2488 case CEPH_MSG_WATCH_NOTIFY: 2489 handle_watch_notify(osdc, msg); 2490 break; 2491 2492 default: 2493 pr_err("received unknown message type %d %s\n", type, 2494 ceph_msg_type_name(type)); 2495 } 2496 out: 2497 ceph_msg_put(msg); 2498 } 2499 2500 /* 2501 * lookup and return message for incoming reply. set up reply message 2502 * pages. 2503 */ 2504 static struct ceph_msg *get_reply(struct ceph_connection *con, 2505 struct ceph_msg_header *hdr, 2506 int *skip) 2507 { 2508 struct ceph_osd *osd = con->private; 2509 struct ceph_osd_client *osdc = osd->o_osdc; 2510 struct ceph_msg *m; 2511 struct ceph_osd_request *req; 2512 int front = le32_to_cpu(hdr->front_len); 2513 int data_len = le32_to_cpu(hdr->data_len); 2514 u64 tid; 2515 2516 tid = le64_to_cpu(hdr->tid); 2517 mutex_lock(&osdc->request_mutex); 2518 req = __lookup_request(osdc, tid); 2519 if (!req) { 2520 *skip = 1; 2521 m = NULL; 2522 dout("get_reply unknown tid %llu from osd%d\n", tid, 2523 osd->o_osd); 2524 goto out; 2525 } 2526 2527 if (req->r_reply->con) 2528 dout("%s revoking msg %p from old con %p\n", __func__, 2529 req->r_reply, req->r_reply->con); 2530 ceph_msg_revoke_incoming(req->r_reply); 2531 2532 if (front > req->r_reply->front.iov_len) { 2533 pr_warning("get_reply front %d > preallocated %d\n", 2534 front, (int)req->r_reply->front.iov_len); 2535 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2536 if (!m) 2537 goto out; 2538 ceph_msg_put(req->r_reply); 2539 req->r_reply = m; 2540 } 2541 m = ceph_msg_get(req->r_reply); 2542 2543 if (data_len > 0) { 2544 struct ceph_osd_data *osd_data; 2545 2546 /* 2547 * XXX This is assuming there is only one op containing 2548 * XXX page data. Probably OK for reads, but this 2549 * XXX ought to be done more generally. 2550 */ 2551 osd_data = osd_req_op_extent_osd_data(req, 0); 2552 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2553 if (osd_data->pages && 2554 unlikely(osd_data->length < data_len)) { 2555 2556 pr_warning("tid %lld reply has %d bytes " 2557 "we had only %llu bytes ready\n", 2558 tid, data_len, osd_data->length); 2559 *skip = 1; 2560 ceph_msg_put(m); 2561 m = NULL; 2562 goto out; 2563 } 2564 } 2565 } 2566 *skip = 0; 2567 dout("get_reply tid %lld %p\n", tid, m); 2568 2569 out: 2570 mutex_unlock(&osdc->request_mutex); 2571 return m; 2572 2573 } 2574 2575 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2576 struct ceph_msg_header *hdr, 2577 int *skip) 2578 { 2579 struct ceph_osd *osd = con->private; 2580 int type = le16_to_cpu(hdr->type); 2581 int front = le32_to_cpu(hdr->front_len); 2582 2583 *skip = 0; 2584 switch (type) { 2585 case CEPH_MSG_OSD_MAP: 2586 case CEPH_MSG_WATCH_NOTIFY: 2587 return ceph_msg_new(type, front, GFP_NOFS, false); 2588 case CEPH_MSG_OSD_OPREPLY: 2589 return get_reply(con, hdr, skip); 2590 default: 2591 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2592 osd->o_osd); 2593 *skip = 1; 2594 return NULL; 2595 } 2596 } 2597 2598 /* 2599 * Wrappers to refcount containing ceph_osd struct 2600 */ 2601 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2602 { 2603 struct ceph_osd *osd = con->private; 2604 if (get_osd(osd)) 2605 return con; 2606 return NULL; 2607 } 2608 2609 static void put_osd_con(struct ceph_connection *con) 2610 { 2611 struct ceph_osd *osd = con->private; 2612 put_osd(osd); 2613 } 2614 2615 /* 2616 * authentication 2617 */ 2618 /* 2619 * Note: returned pointer is the address of a structure that's 2620 * managed separately. Caller must *not* attempt to free it. 2621 */ 2622 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2623 int *proto, int force_new) 2624 { 2625 struct ceph_osd *o = con->private; 2626 struct ceph_osd_client *osdc = o->o_osdc; 2627 struct ceph_auth_client *ac = osdc->client->monc.auth; 2628 struct ceph_auth_handshake *auth = &o->o_auth; 2629 2630 if (force_new && auth->authorizer) { 2631 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2632 auth->authorizer = NULL; 2633 } 2634 if (!auth->authorizer) { 2635 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2636 auth); 2637 if (ret) 2638 return ERR_PTR(ret); 2639 } else { 2640 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2641 auth); 2642 if (ret) 2643 return ERR_PTR(ret); 2644 } 2645 *proto = ac->protocol; 2646 2647 return auth; 2648 } 2649 2650 2651 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2652 { 2653 struct ceph_osd *o = con->private; 2654 struct ceph_osd_client *osdc = o->o_osdc; 2655 struct ceph_auth_client *ac = osdc->client->monc.auth; 2656 2657 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2658 } 2659 2660 static int invalidate_authorizer(struct ceph_connection *con) 2661 { 2662 struct ceph_osd *o = con->private; 2663 struct ceph_osd_client *osdc = o->o_osdc; 2664 struct ceph_auth_client *ac = osdc->client->monc.auth; 2665 2666 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2667 return ceph_monc_validate_auth(&osdc->client->monc); 2668 } 2669 2670 static const struct ceph_connection_operations osd_con_ops = { 2671 .get = get_osd_con, 2672 .put = put_osd_con, 2673 .dispatch = dispatch, 2674 .get_authorizer = get_authorizer, 2675 .verify_authorizer_reply = verify_authorizer_reply, 2676 .invalidate_authorizer = invalidate_authorizer, 2677 .alloc_msg = alloc_msg, 2678 .fault = osd_reset, 2679 }; 2680
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.