--- zzzz-none-000/linux-3.10.107/fs/ceph/mds_client.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/fs/ceph/mds_client.c 2021-02-04 17:41:59.000000000 +0000 @@ -3,9 +3,12 @@ #include #include #include +#include #include #include #include +#include +#include #include "super.h" #include "mds_client.h" @@ -43,6 +46,7 @@ */ struct ceph_reconnect_state { + int nr_caps; struct ceph_pagelist *pagelist; bool flock; }; @@ -62,7 +66,7 @@ */ static int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info, - int features) + u64 features) { int err = -EIO; @@ -86,6 +90,16 @@ ceph_decode_need(p, end, info->xattr_len, bad); info->xattr_data = *p; *p += info->xattr_len; + + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ceph_decode_64_safe(p, end, info->inline_version, bad); + ceph_decode_32_safe(p, end, info->inline_len, bad); + ceph_decode_need(p, end, info->inline_len, bad); + info->inline_data = *p; + *p += info->inline_len; + } else + info->inline_version = CEPH_INLINE_NONE; + return 0; bad: return err; @@ -97,7 +111,7 @@ */ static int parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { int err; @@ -144,7 +158,7 @@ */ static int parse_reply_info_dir(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { u32 num, i = 0; int err; @@ -164,21 +178,18 @@ if (num == 0) goto done; - /* alloc large array */ - info->dir_nr = num; - info->dir_in = kcalloc(num, sizeof(*info->dir_in) + - sizeof(*info->dir_dname) + - sizeof(*info->dir_dname_len) + - sizeof(*info->dir_dlease), - GFP_NOFS); - if (info->dir_in == NULL) { - err = -ENOMEM; - goto out_bad; - } + BUG_ON(!info->dir_in); info->dir_dname = (void *)(info->dir_in + num); info->dir_dname_len = (void *)(info->dir_dname + num); info->dir_dlease = (void *)(info->dir_dname_len + num); + if ((unsigned long)(info->dir_dlease + num) > + (unsigned long)info->dir_in + info->dir_buf_size) { + pr_err("dir contents are larger than expected\n"); + WARN_ON(1); + goto bad; + } + info->dir_nr = num; while (num) { /* dentry */ ceph_decode_need(p, end, sizeof(u32)*2, bad); @@ -216,7 +227,7 @@ */ static int parse_reply_info_filelock(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (*p + sizeof(*info->filelock_reply) > end) goto bad; @@ -237,7 +248,7 @@ */ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { if (*p == end) { @@ -261,14 +272,15 @@ */ static int parse_reply_info_extra(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { - if (info->head->op == CEPH_MDS_OP_GETFILELOCK) + u32 op = le32_to_cpu(info->head->op); + + if (op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); - else if (info->head->op == CEPH_MDS_OP_READDIR || - info->head->op == CEPH_MDS_OP_LSSNAP) + else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) return parse_reply_info_dir(p, end, info, features); - else if (info->head->op == CEPH_MDS_OP_CREATE) + else if (op == CEPH_MDS_OP_CREATE) return parse_reply_info_create(p, end, info, features); else return -EIO; @@ -279,7 +291,7 @@ */ static int parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { void *p, *end; u32 len; @@ -326,14 +338,16 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) { - kfree(info->dir_in); + if (!info->dir_in) + return; + free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size)); } /* * sessions */ -static const char *session_state_name(int s) +const char *ceph_session_state_name(int s) { switch (s) { case CEPH_MDS_SESSION_NEW: return "new"; @@ -443,9 +457,9 @@ INIT_LIST_HEAD(&s->s_waiting); INIT_LIST_HEAD(&s->s_unsafe); s->s_num_cap_releases = 0; + s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); - INIT_LIST_HEAD(&s->s_cap_releases_done); INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_snaps_flushing); @@ -467,6 +481,7 @@ mdsc->max_sessions = newmax; } mdsc->sessions[mds] = s; + atomic_inc(&mdsc->num_sessions); atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, @@ -490,6 +505,7 @@ mdsc->sessions[s->s_mds] = NULL; ceph_con_close(&s->s_con); ceph_put_mds_session(s); + atomic_dec(&mdsc->num_sessions); } /* @@ -510,23 +526,23 @@ struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request, r_kref); + destroy_reply_info(&req->r_reply_info); if (req->r_request) ceph_msg_put(req->r_request); - if (req->r_reply) { + if (req->r_reply) ceph_msg_put(req->r_reply); - destroy_reply_info(&req->r_reply_info); - } if (req->r_inode) { ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); iput(req->r_inode); } if (req->r_locked_dir) ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); - if (req->r_target_inode) - iput(req->r_target_inode); + iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); - if (req->r_old_dentry) { + if (req->r_old_dentry) + dput(req->r_old_dentry); + if (req->r_old_dentry_dir) { /* * track (and drop pins for) r_old_dentry_dir * separately, since r_old_dentry's d_parent may have @@ -535,11 +551,12 @@ */ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - dput(req->r_old_dentry); iput(req->r_old_dentry_dir); } kfree(req->r_path1); kfree(req->r_path2); + if (req->r_pagelist) + ceph_pagelist_release(req->r_pagelist); put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); kfree(req); @@ -613,14 +630,12 @@ req->r_uid = current_fsuid(); req->r_gid = current_fsgid(); - if (dir) { - struct ceph_inode_info *ci = ceph_inode(dir); + if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) + mdsc->oldest_tid = req->r_tid; + if (dir) { ihold(dir); - spin_lock(&ci->i_unsafe_lock); req->r_unsafe_dir = dir; - list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); - spin_unlock(&ci->i_unsafe_lock); } } @@ -628,16 +643,41 @@ struct ceph_mds_request *req) { dout("__unregister_request %p tid %lld\n", req, req->r_tid); + + /* Never leave an unregistered request on an unsafe list! */ + list_del_init(&req->r_unsafe_item); + + if (req->r_tid == mdsc->oldest_tid) { + struct rb_node *p = rb_next(&req->r_node); + mdsc->oldest_tid = 0; + while (p) { + struct ceph_mds_request *next_req = + rb_entry(p, struct ceph_mds_request, r_node); + if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { + mdsc->oldest_tid = next_req->r_tid; + break; + } + p = rb_next(p); + } + } + rb_erase(&req->r_node, &mdsc->request_tree); RB_CLEAR_NODE(&req->r_node); - if (req->r_unsafe_dir) { + if (req->r_unsafe_dir && req->r_got_unsafe) { struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); - spin_lock(&ci->i_unsafe_lock); list_del_init(&req->r_unsafe_dir_item); spin_unlock(&ci->i_unsafe_lock); + } + if (req->r_target_inode && req->r_got_unsafe) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_del_init(&req->r_unsafe_target_item); + spin_unlock(&ci->i_unsafe_lock); + } + if (req->r_unsafe_dir) { iput(req->r_unsafe_dir); req->r_unsafe_dir = NULL; } @@ -663,7 +703,7 @@ * except to resplice to another snapdir, and either the old or new * result is a valid result. */ - while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP) dentry = dentry->d_parent; return dentry; } @@ -700,25 +740,26 @@ } else if (req->r_dentry) { /* ignore race with rename; old or new d_parent is okay */ struct dentry *parent = req->r_dentry->d_parent; - struct inode *dir = parent->d_inode; + struct inode *dir = d_inode(parent); if (dir->i_sb != mdsc->fsc->sb) { /* not this fs! */ - inode = req->r_dentry->d_inode; + inode = d_inode(req->r_dentry); } else if (ceph_snap(dir) != CEPH_NOSNAP) { /* direct snapped/virtual snapdir requests * based on parent dir inode */ struct dentry *dn = get_nonsnap_parent(parent); - inode = dn->d_inode; + inode = d_inode(dn); dout("__choose_mds using nonsnap parent %p\n", inode); - } else if (req->r_dentry->d_inode) { - /* dentry target */ - inode = req->r_dentry->d_inode; } else { - /* dir + name */ - inode = dir; - hash = ceph_dentry_hash(dir, req->r_dentry); - is_hash = true; + /* dentry target */ + inode = d_inode(req->r_dentry); + if (!inode || mode == USE_AUTH_MDS) { + /* dir + name */ + inode = dir; + hash = ceph_dentry_hash(dir, req->r_dentry); + is_hash = true; + } } } @@ -809,6 +850,78 @@ h = msg->front.iov_base; h->op = cpu_to_le32(op); h->seq = cpu_to_le64(seq); + + return msg; +} + +/* + * session message, specialization for CEPH_SESSION_REQUEST_OPEN + * to include additional client metadata fields. + */ +static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) +{ + struct ceph_msg *msg; + struct ceph_mds_session_head *h; + int i = -1; + int metadata_bytes = 0; + int metadata_key_count = 0; + struct ceph_options *opt = mdsc->fsc->client->options; + void *p; + + const char* metadata[][2] = { + {"hostname", utsname()->nodename}, + {"kernel_version", utsname()->release}, + {"entity_id", opt->name ? opt->name : ""}, + {NULL, NULL} + }; + + /* Calculate serialized length of metadata */ + metadata_bytes = 4; /* map length */ + for (i = 0; metadata[i][0] != NULL; ++i) { + metadata_bytes += 8 + strlen(metadata[i][0]) + + strlen(metadata[i][1]); + metadata_key_count++; + } + + /* Allocate the message */ + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes, + GFP_NOFS, false); + if (!msg) { + pr_err("create_session_msg ENOMEM creating msg\n"); + return NULL; + } + h = msg->front.iov_base; + h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); + h->seq = cpu_to_le64(seq); + + /* + * Serialize client metadata into waiting buffer space, using + * the format that userspace expects for map + * + * ClientSession messages with metadata are v2 + */ + msg->hdr.version = cpu_to_le16(2); + msg->hdr.compat_version = cpu_to_le16(1); + + /* The write pointer, following the session_head structure */ + p = msg->front.iov_base + sizeof(*h); + + /* Number of entries in the map */ + ceph_encode_32(&p, metadata_key_count); + + /* Two length-prefixed strings for each entry in the map */ + for (i = 0; metadata[i][0] != NULL; ++i) { + size_t const key_len = strlen(metadata[i][0]); + size_t const val_len = strlen(metadata[i][1]); + + ceph_encode_32(&p, key_len); + memcpy(p, metadata[i][0], key_len); + p += key_len; + ceph_encode_32(&p, val_len); + memcpy(p, metadata[i][1], val_len); + p += val_len; + } + return msg; } @@ -832,7 +945,7 @@ session->s_renew_requested = jiffies; /* send connect message */ - msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); + msg = create_session_open_msg(mdsc, session->s_seq); if (!msg) return -ENOMEM; ceph_con_send(&session->s_con, msg); @@ -844,35 +957,56 @@ * * called under mdsc->mutex */ +static struct ceph_mds_session * +__open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + session = __ceph_lookup_mds_session(mdsc, target); + if (!session) { + session = register_session(mdsc, target); + if (IS_ERR(session)) + return session; + } + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + + return session; +} + +struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + dout("open_export_target_session to mds%d\n", target); + + mutex_lock(&mdsc->mutex); + session = __open_export_target_session(mdsc, target); + mutex_unlock(&mdsc->mutex); + + return session; +} + static void __open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_mds_info *mi; struct ceph_mds_session *ts; int i, mds = session->s_mds; - int target; if (mds >= mdsc->mdsmap->m_max_mds) return; + mi = &mdsc->mdsmap->m_info[mds]; dout("open_export_target_sessions for mds%d (%d targets)\n", session->s_mds, mi->num_export_targets); for (i = 0; i < mi->num_export_targets; i++) { - target = mi->export_targets[i]; - ts = __ceph_lookup_mds_session(mdsc, target); - if (!ts) { - ts = register_session(mdsc, target); - if (IS_ERR(ts)) - return; - } - if (session->s_state == CEPH_MDS_SESSION_NEW || - session->s_state == CEPH_MDS_SESSION_CLOSING) - __open_session(mdsc, session); - else - dout(" mds%d target mds%d %p is %s\n", session->s_mds, - i, ts, session_state_name(ts->s_state)); - ceph_put_mds_session(ts); + ts = __open_export_target_session(mdsc, mi->export_targets[i]); + if (!IS_ERR(ts)) + ceph_put_mds_session(ts); } } @@ -888,27 +1022,52 @@ * session caps */ -/* - * Free preallocated cap messages assigned to this session - */ -static void cleanup_cap_releases(struct ceph_mds_session *session) +/* caller holds s_cap_lock, we drop it */ +static void cleanup_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) + __releases(session->s_cap_lock) { - struct ceph_msg *msg; + LIST_HEAD(tmp_list); + list_splice_init(&session->s_cap_releases, &tmp_list); + session->s_num_cap_releases = 0; + spin_unlock(&session->s_cap_lock); - spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); - } - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); + dout("cleanup_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + struct ceph_cap *cap; + /* zero out the in-progress message */ + cap = list_first_entry(&tmp_list, + struct ceph_cap, session_caps); + list_del(&cap->session_caps); + ceph_put_cap(mdsc, cap); } - spin_unlock(&session->s_cap_lock); +} + +static void cleanup_session_requests(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_mds_request *req; + struct rb_node *p; + + dout("cleanup_session_requests mds%d\n", session->s_mds); + mutex_lock(&mdsc->mutex); + while (!list_empty(&session->s_unsafe)) { + req = list_first_entry(&session->s_unsafe, + struct ceph_mds_request, r_unsafe_item); + pr_warn_ratelimited(" dropping unsafe request %llu\n", + req->r_tid); + __unregister_request(mdsc, req); + } + /* zero r_attempts, so kick_requests() will re-send requests */ + p = rb_first(&mdsc->request_tree); + while (p) { + req = rb_entry(p, struct ceph_mds_request, r_node); + p = rb_next(p); + if (req->r_session && + req->r_session->s_mds == session->s_mds) + req->r_attempts = 0; + } + mutex_unlock(&mdsc->mutex); } /* @@ -958,10 +1117,16 @@ dout("iterate_session_caps finishing cap %p removal\n", cap); BUG_ON(cap->session != session); + cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - cap->session = NULL; - old_cap = cap; /* put_cap it w/o locks held */ + if (cap->queue_release) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + } else { + old_cap = cap; /* put_cap it w/o locks held */ + } } if (ret < 0) goto out; @@ -971,8 +1136,7 @@ session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock); - if (last_inode) - iput(last_inode); + iput(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap); @@ -983,19 +1147,35 @@ void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + LIST_HEAD(to_remove); int drop = 0; dout("removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); spin_lock(&ci->i_ceph_lock); - __ceph_remove_cap(cap); - if (!__ceph_is_any_real_caps(ci)) { + __ceph_remove_cap(cap, false); + if (!ci->i_auth_cap) { + struct ceph_cap_flush *cf; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + while (true) { + struct rb_node *n = rb_first(&ci->i_cap_flush_tree); + if (!n) + break; + cf = rb_entry(n, struct ceph_cap_flush, i_node); + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add(&cf->list, &to_remove); + } + spin_lock(&mdsc->cap_dirty_lock); + + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + if (!list_empty(&ci->i_dirty_item)) { - pr_info(" dropping dirty %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty %s state for %p %lld\n", ceph_cap_string(ci->i_dirty_caps), inode, ceph_ino(inode)); ci->i_dirty_caps = 0; @@ -1003,7 +1183,8 @@ drop = 1; } if (!list_empty(&ci->i_flushing_item)) { - pr_info(" dropping dirty+flushing %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty+flushing %s state for %p %lld\n", ceph_cap_string(ci->i_flushing_caps), inode, ceph_ino(inode)); ci->i_flushing_caps = 0; @@ -1011,16 +1192,21 @@ mdsc->num_cap_flushing--; drop = 1; } - if (drop && ci->i_wrbuffer_ref) { - pr_info(" dropping dirty data for %p %lld\n", - inode, ceph_ino(inode)); - ci->i_wrbuffer_ref = 0; - ci->i_wrbuffer_ref_head = 0; - drop++; - } spin_unlock(&mdsc->cap_dirty_lock); + + if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { + list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + ci->i_prealloc_cap_flush = NULL; + } } spin_unlock(&ci->i_ceph_lock); + while (!list_empty(&to_remove)) { + struct ceph_cap_flush *cf; + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + ceph_free_cap_flush(cf); + } while (drop--) iput(inode); return 0; @@ -1033,9 +1219,41 @@ { dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, NULL); + + spin_lock(&session->s_cap_lock); + if (session->s_nr_caps > 0) { + struct super_block *sb = session->s_mdsc->fsc->sb; + struct inode *inode; + struct ceph_cap *cap, *prev = NULL; + struct ceph_vino vino; + /* + * iterate_session_caps() skips inodes that are being + * deleted, we need to wait until deletions are complete. + * __wait_on_freeing_inode() is designed for the job, + * but it is not exported, so use lookup inode function + * to access it. + */ + while (!list_empty(&session->s_caps)) { + cap = list_entry(session->s_caps.next, + struct ceph_cap, session_caps); + if (cap == prev) + break; + prev = cap; + vino = cap->ci->i_vino; + spin_unlock(&session->s_cap_lock); + + inode = ceph_find_inode(sb, vino); + iput(inode); + + spin_lock(&session->s_cap_lock); + } + } + + // drop cap expires and unlock s_cap_lock + cleanup_cap_releases(session->s_mdsc, session); + BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); - cleanup_cap_releases(session); } /* @@ -1103,6 +1321,21 @@ return 0; } +static int send_flushmsg_ack(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, u64 seq) +{ + struct ceph_msg *msg; + + dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", + session->s_mds, ceph_session_state_name(session->s_state), seq); + msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); + if (!msg) + return -ENOMEM; + ceph_con_send(&session->s_con, msg); + return 0; +} + + /* * Note new cap ttl, and any transition from stale -> not stale (fresh?). * @@ -1146,7 +1379,7 @@ struct ceph_msg *msg; dout("request_close_session mds%d state %s seq %lld\n", - session->s_mds, session_state_name(session->s_state), + session->s_mds, ceph_session_state_name(session->s_state), session->s_seq); msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); if (!msg) @@ -1181,7 +1414,7 @@ { struct ceph_mds_session *session = arg; struct ceph_inode_info *ci = ceph_inode(inode); - int used, oissued, mine; + int used, wanted, oissued, mine; if (session->s_trim_caps <= 0) return -1; @@ -1189,24 +1422,35 @@ spin_lock(&ci->i_ceph_lock); mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); + wanted = __ceph_caps_file_wanted(ci); oissued = __ceph_caps_issued_other(ci, cap); - dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", + dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), - ceph_cap_string(used)); - if (ci->i_dirty_caps) - goto out; /* dirty caps */ - if ((used & ~oissued) & mine) + ceph_cap_string(used), ceph_cap_string(wanted)); + if (cap == ci->i_auth_cap) { + if (ci->i_dirty_caps || ci->i_flushing_caps || + !list_empty(&ci->i_cap_snaps)) + goto out; + if ((used | wanted) & CEPH_CAP_ANY_WR) + goto out; + } + /* The inode has cached pages, but it's no longer used. + * we can safely drop it */ + if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && + !(oissued & CEPH_CAP_FILE_CACHE)) { + used = 0; + oissued = 0; + } + if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */ session->s_trim_caps--; if (oissued) { /* we aren't the only cap.. just remove us */ - __queue_cap_release(session, ceph_ino(inode), cap->cap_id, - cap->mseq, cap->issue_seq); - __ceph_remove_cap(cap); + __ceph_remove_cap(cap, true); } else { - /* try to drop referring dentries */ + /* try dropping referring dentries */ spin_unlock(&ci->i_ceph_lock); d_prune_aliases(inode); dout("trim_caps_cb %p cap %p pruned, count now %d\n", @@ -1238,122 +1482,104 @@ trim_caps - session->s_trim_caps); session->s_trim_caps = 0; } + + ceph_send_cap_releases(mdsc, session); return 0; } -/* - * Allocate cap_release messages. If there is a partially full message - * in the queue, try to allocate enough to cover it's remainder, so that - * we can send it immediately. - * - * Called under s_mutex. - */ -int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static int check_capsnap_flush(struct ceph_inode_info *ci, + u64 want_snap_seq) { - struct ceph_msg *msg, *partial = NULL; - struct ceph_mds_cap_release *head; - int err = -ENOMEM; - int extra = mdsc->fsc->mount_options->cap_release_safety; - int num; - - dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, - extra); - - spin_lock(&session->s_cap_lock); - - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - if (num) { - dout(" partial %p with (%d/%d)\n", msg, num, - (int)CEPH_CAPS_PER_RELEASE); - extra += CEPH_CAPS_PER_RELEASE - num; - partial = msg; - } - } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { - spin_unlock(&session->s_cap_lock); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS, false); - if (!msg) - goto out_unlocked; - dout("add_cap_releases %p msg %p now %d\n", session, msg, - (int)msg->front.iov_len); - head = msg->front.iov_base; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - spin_lock(&session->s_cap_lock); - list_add(&msg->list_head, &session->s_cap_releases); - session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; + int ret = 1; + spin_lock(&ci->i_ceph_lock); + if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, ci_item); + ret = capsnap->follows >= want_snap_seq; } + spin_unlock(&ci->i_ceph_lock); + return ret; +} - if (partial) { - head = partial->front.iov_base; - num = le32_to_cpu(head->num); - dout(" queueing partial %p with %d/%d\n", partial, num, - (int)CEPH_CAPS_PER_RELEASE); - list_move_tail(&partial->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; +static int check_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid) +{ + struct rb_node *n; + struct ceph_cap_flush *cf; + int ret = 1; + + spin_lock(&mdsc->cap_dirty_lock); + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (cf && cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid %llu <= %llu\n", + cf->tid, want_flush_tid); + ret = 0; } - err = 0; - spin_unlock(&session->s_cap_lock); -out_unlocked: - return err; + spin_unlock(&mdsc->cap_dirty_lock); + return ret; } /* * flush all dirty inode data to disk. * - * returns true if we've flushed through want_flush_seq + * returns true if we've flushed through want_flush_tid */ -static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) +static void wait_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid, u64 want_snap_seq) { - int mds, ret = 1; + int mds; - dout("check_cap_flush want %lld\n", want_flush_seq); + dout("check_caps_flush want %llu snap want %llu\n", + want_flush_tid, want_snap_seq); mutex_lock(&mdsc->mutex); - for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { + for (mds = 0; mds < mdsc->max_sessions; ) { struct ceph_mds_session *session = mdsc->sessions[mds]; + struct inode *inode = NULL; - if (!session) + if (!session) { + mds++; continue; + } get_session(session); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_flushing)) { - struct ceph_inode_info *ci = - list_entry(session->s_cap_flushing.next, - struct ceph_inode_info, - i_flushing_item); - struct inode *inode = &ci->vfs_inode; - - spin_lock(&ci->i_ceph_lock); - if (ci->i_cap_flush_seq <= want_flush_seq) { - dout("check_cap_flush still flushing %p " - "seq %lld <= %lld to mds%d\n", inode, - ci->i_cap_flush_seq, want_flush_seq, - session->s_mds); - ret = 0; + if (!list_empty(&session->s_cap_snaps_flushing)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&session->s_cap_snaps_flushing, + struct ceph_cap_snap, + flushing_item); + struct ceph_inode_info *ci = capsnap->ci; + if (!check_capsnap_flush(ci, want_snap_seq)) { + dout("check_cap_flush still flushing snap %p " + "follows %lld <= %lld to mds%d\n", + &ci->vfs_inode, capsnap->follows, + want_snap_seq, mds); + inode = igrab(&ci->vfs_inode); } - spin_unlock(&ci->i_ceph_lock); } mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); - if (!ret) - return ret; + if (inode) { + wait_event(mdsc->cap_flushing_wq, + check_capsnap_flush(ceph_inode(inode), + want_snap_seq)); + iput(inode); + } else { + mds++; + } + mutex_lock(&mdsc->mutex); } - mutex_unlock(&mdsc->mutex); - dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); - return ret; + + wait_event(mdsc->cap_flushing_wq, + check_caps_flush(mdsc, want_flush_tid)); + + dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); } /* @@ -1362,58 +1588,73 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg = NULL; + struct ceph_mds_cap_release *head; + struct ceph_mds_cap_item *item; + struct ceph_cap *cap; + LIST_HEAD(tmp_list); + int num_cap_releases; - dout("send_cap_releases mds%d\n", session->s_mds); spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - spin_unlock(&session->s_cap_lock); - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); - ceph_con_send(&session->s_con, msg); - spin_lock(&session->s_cap_lock); - } +again: + list_splice_init(&session->s_cap_releases, &tmp_list); + num_cap_releases = session->s_num_cap_releases; + session->s_num_cap_releases = 0; spin_unlock(&session->s_cap_lock); -} -static void discard_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - unsigned num; + while (!list_empty(&tmp_list)) { + if (!msg) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, + PAGE_CACHE_SIZE, GFP_NOFS, false); + if (!msg) + goto out_err; + head = msg->front.iov_base; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + } + cap = list_first_entry(&tmp_list, struct ceph_cap, + session_caps); + list_del(&cap->session_caps); + num_cap_releases--; - dout("discard_cap_releases mds%d\n", session->s_mds); - spin_lock(&session->s_cap_lock); + head = msg->front.iov_base; + le32_add_cpu(&head->num, 1); + item = msg->front.iov_base + msg->front.iov_len; + item->ino = cpu_to_le64(cap->cap_ino); + item->cap_id = cpu_to_le64(cap->cap_id); + item->migrate_seq = cpu_to_le32(cap->mseq); + item->seq = cpu_to_le32(cap->issue_seq); + msg->front.iov_len += sizeof(*item); - /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); - head->num = cpu_to_le32(0); - session->s_num_cap_releases += num; - - /* requeue completed messages */ - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); + ceph_put_cap(mdsc, cap); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, - num); - session->s_num_cap_releases += num; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - list_add(&msg->list_head, &session->s_cap_releases); + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); + msg = NULL; + } } + BUG_ON(num_cap_releases != 0); + + spin_lock(&session->s_cap_lock); + if (!list_empty(&session->s_cap_releases)) + goto again; + spin_unlock(&session->s_cap_lock); + + if (msg) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); + } + return; +out_err: + pr_err("send_cap_releases mds%d, failed to allocate message\n", + session->s_mds); + spin_lock(&session->s_cap_lock); + list_splice(&tmp_list, &session->s_cap_releases); + session->s_num_cap_releases += num_cap_releases; spin_unlock(&session->s_cap_lock); } @@ -1421,6 +1662,44 @@ * requests */ +int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, + struct inode *dir) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; + struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; + size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) + + sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease); + int order, num_entries; + + spin_lock(&ci->i_ceph_lock); + num_entries = ci->i_files + ci->i_subdirs; + spin_unlock(&ci->i_ceph_lock); + num_entries = max(num_entries, 1); + num_entries = min(num_entries, opt->max_readdir); + + order = get_order(size * num_entries); + while (order >= 0) { + rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL | + __GFP_NOWARN, + order); + if (rinfo->dir_in) + break; + order--; + } + if (!rinfo->dir_in) + return -ENOMEM; + + num_entries = (PAGE_SIZE << order) / size; + num_entries = min(num_entries, opt->max_readdir); + + rinfo->dir_buf_size = PAGE_SIZE << order; + req->r_num_caps = num_entries + 1; + req->r_args.readdir.max_entries = cpu_to_le32(num_entries); + req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); + return 0; +} + /* * Create an mds request. */ @@ -1437,6 +1716,7 @@ req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); + INIT_LIST_HEAD(&req->r_unsafe_target_item); req->r_fmode = -1; kref_init(&req->r_kref); INIT_LIST_HEAD(&req->r_wait); @@ -1444,6 +1724,8 @@ init_completion(&req->r_safe_completion); INIT_LIST_HEAD(&req->r_unsafe_item); + req->r_stamp = CURRENT_TIME; + req->r_op = op; req->r_direct_mode = mode; return req; @@ -1462,13 +1744,9 @@ struct ceph_mds_request, r_node); } -static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) +static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) { - struct ceph_mds_request *req = __get_oldest_req(mdsc); - - if (req) - return req->r_tid; - return 0; + return mdsc->oldest_tid; } /* @@ -1497,7 +1775,7 @@ seq = read_seqbegin(&rename_lock); rcu_read_lock(); for (temp = dentry; !IS_ROOT(temp);) { - struct inode *inode = temp->d_inode; + struct inode *inode = d_inode(temp); if (inode && ceph_snap(inode) == CEPH_SNAPDIR) len++; /* slash only */ else if (stop_on_nosnap && inode && @@ -1521,7 +1799,7 @@ struct inode *inode; spin_lock(&temp->d_lock); - inode = temp->d_inode; + inode = d_inode(temp); if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { dout("build_path path+%d: %p SNAPDIR\n", pos, temp); @@ -1555,10 +1833,10 @@ goto retry; } - *base = ceph_ino(temp->d_inode); + *base = ceph_ino(d_inode(temp)); *plen = len; dout("build_path on %p %d built %llx '%.*s'\n", - dentry, dentry->d_count, *base, len, path); + dentry, d_count(dentry), *base, len, path); return path; } @@ -1568,8 +1846,8 @@ { char *path; - if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) { - *pino = ceph_ino(dentry->d_parent->d_inode); + if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) { + *pino = ceph_ino(d_inode(dentry->d_parent)); *ppath = dentry->d_name.name; *ppathlen = dentry->d_name.len; return 0; @@ -1638,7 +1916,7 @@ */ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, - int mds) + int mds, bool drop_cap_releases) { struct ceph_msg *msg; struct ceph_mds_request_head *head; @@ -1669,7 +1947,8 @@ } len = sizeof(*head) + - pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); + pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + + sizeof(struct ceph_timespec); /* calculate (max) length for cap releases */ len += sizeof(struct ceph_mds_request_release) * @@ -1686,6 +1965,7 @@ goto out_free2; } + msg->hdr.version = cpu_to_le16(2); msg->hdr.tid = cpu_to_le64(req->r_tid); head = msg->front.iov_base; @@ -1708,7 +1988,7 @@ releases = 0; if (req->r_inode_drop) releases += ceph_encode_inode_release(&p, - req->r_inode ? req->r_inode : req->r_dentry->d_inode, + req->r_inode ? req->r_inode : d_inode(req->r_dentry), mds, req->r_inode_drop, req->r_inode_unless, 0); if (req->r_dentry_drop) releases += ceph_encode_dentry_release(&p, req->r_dentry, @@ -1718,21 +1998,36 @@ mds, req->r_old_dentry_drop, req->r_old_dentry_unless); if (req->r_old_inode_drop) releases += ceph_encode_inode_release(&p, - req->r_old_dentry->d_inode, + d_inode(req->r_old_dentry), mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); + + if (drop_cap_releases) { + releases = 0; + p = msg->front.iov_base + req->r_request_release_offset; + } + head->num_releases = cpu_to_le16(releases); + /* time stamp */ + { + struct ceph_timespec ts; + ceph_encode_timespec(&ts, &req->r_stamp); + ceph_encode_copy(&p, &ts, sizeof(ts)); + } + BUG_ON(p > end); msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - if (req->r_data_len) { - /* outbound data set only by ceph_sync_setxattr() */ - BUG_ON(!req->r_pages); - ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0); + if (req->r_pagelist) { + struct ceph_pagelist *pagelist = req->r_pagelist; + atomic_inc(&pagelist->refcnt); + ceph_msg_data_add_pagelist(msg, pagelist); + msg->hdr.data_len = cpu_to_le32(pagelist->length); + } else { + msg->hdr.data_len = 0; } - msg->hdr.data_len = cpu_to_le32(req->r_data_len); msg->hdr.data_off = cpu_to_le16(0); out_free2: @@ -1763,7 +2058,7 @@ */ static int __prepare_send_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, - int mds) + int mds, bool drop_cap_releases) { struct ceph_mds_request_head *rhead; struct ceph_msg *msg; @@ -1783,6 +2078,7 @@ req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); if (req->r_got_unsafe) { + void *p; /* * Replay. Do not regenerate message (and rebuild * paths, etc.); just use the original message. @@ -1803,8 +2099,17 @@ /* remove cap/dentry releases from message */ rhead->num_releases = 0; - msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); - msg->front.iov_len = req->r_request_release_offset; + + /* time stamp */ + p = msg->front.iov_base + req->r_request_release_offset; + { + struct ceph_timespec ts; + ceph_encode_timespec(&ts, &req->r_stamp); + ceph_encode_copy(&p, &ts, sizeof(ts)); + } + + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); return 0; } @@ -1812,10 +2117,9 @@ ceph_msg_put(req->r_request); req->r_request = NULL; } - msg = create_request_message(mdsc, req, mds); + msg = create_request_message(mdsc, req, mds, drop_cap_releases); if (IS_ERR(msg)) { req->r_err = PTR_ERR(msg); - complete_request(mdsc, req); return PTR_ERR(msg); } req->r_request = msg; @@ -1843,7 +2147,7 @@ { struct ceph_mds_session *session = NULL; int mds = -1; - int err = -EAGAIN; + int err = 0; if (req->r_err || req->r_got_result) { if (req->r_aborted) @@ -1857,6 +2161,11 @@ err = -EIO; goto finish; } + if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { + dout("do_request forced umount\n"); + err = -EIO; + goto finish; + } put_request_session(req); @@ -1880,7 +2189,7 @@ req->r_session = get_session(session); dout("do_request mds%d session %p state %s\n", mds, session, - session_state_name(session->s_state)); + ceph_session_state_name(session->s_state)); if (session->s_state != CEPH_MDS_SESSION_OPEN && session->s_state != CEPH_MDS_SESSION_HUNG) { if (session->s_state == CEPH_MDS_SESSION_NEW || @@ -1896,7 +2205,7 @@ if (req->r_request_started == 0) /* note request start time */ req->r_request_started = jiffies; - err = __prepare_send_request(mdsc, req, mds); + err = __prepare_send_request(mdsc, req, mds, false); if (!err) { ceph_msg_get(req->r_request); ceph_con_send(&session->s_con, req->r_request); @@ -1904,13 +2213,15 @@ out_session: ceph_put_mds_session(session); +finish: + if (err) { + dout("__do_request early error %d\n", err); + req->r_err = err; + complete_request(mdsc, req); + __unregister_request(mdsc, req); + } out: return err; - -finish: - req->r_err = err; - complete_request(mdsc, req); - goto out; } /* @@ -1940,16 +2251,20 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) { struct ceph_mds_request *req; - struct rb_node *p; + struct rb_node *p = rb_first(&mdsc->request_tree); dout("kick_requests mds%d\n", mds); - for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { + while (p) { req = rb_entry(p, struct ceph_mds_request, r_node); + p = rb_next(p); if (req->r_got_unsafe) continue; + if (req->r_attempts > 0) + continue; /* only new requests */ if (req->r_session && req->r_session->s_mds == mds) { dout(" kicking tid %llu\n", req->r_tid); + list_del_init(&req->r_wait); __do_request(mdsc, req); } } @@ -1982,7 +2297,7 @@ ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); if (req->r_locked_dir) ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); - if (req->r_old_dentry) + if (req->r_old_dentry_dir) ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); @@ -1993,21 +2308,24 @@ if (req->r_err) { err = req->r_err; - __unregister_request(mdsc, req); - dout("do_request early error %d\n", err); goto out; } /* wait */ mutex_unlock(&mdsc->mutex); dout("do_request waiting\n"); - if (req->r_timeout) { - err = (long)wait_for_completion_killable_timeout( - &req->r_completion, req->r_timeout); - if (err == 0) - err = -EIO; + if (!req->r_timeout && req->r_wait_for_completion) { + err = req->r_wait_for_completion(mdsc, req); } else { - err = wait_for_completion_killable(&req->r_completion); + long timeleft = wait_for_completion_killable_timeout( + &req->r_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (timeleft > 0) + err = 0; + else if (!timeleft) + err = -EIO; /* timed out */ + else + err = timeleft; /* killed */ } dout("do_request waited, got %d\n", err); mutex_lock(&mdsc->mutex); @@ -2071,6 +2389,7 @@ struct ceph_mds_request *req; struct ceph_mds_reply_head *head = msg->front.iov_base; struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ + struct ceph_snap_realm *realm; u64 tid; int err, result; int mds = session->s_mds; @@ -2104,13 +2423,13 @@ /* dup? */ if ((req->r_got_unsafe && !head->safe) || (req->r_got_safe && head->safe)) { - pr_warning("got a dup %s reply on %llu from mds%d\n", + pr_warn("got a dup %s reply on %llu from mds%d\n", head->safe ? "safe" : "unsafe", tid, mds); mutex_unlock(&mdsc->mutex); goto out; } - if (req->r_got_safe && !head->safe) { - pr_warning("got unsafe after safe on %llu from mds%d\n", + if (req->r_got_safe) { + pr_warn("got unsafe after safe on %llu from mds%d\n", tid, mds); mutex_unlock(&mdsc->mutex); goto out; @@ -2127,26 +2446,17 @@ */ if (result == -ESTALE) { dout("got ESTALE on request %llu", req->r_tid); - if (!req->r_inode) { - /* do nothing; not an authority problem */ - } else if (req->r_direct_mode != USE_AUTH_MDS) { + req->r_resend_mds = -1; + if (req->r_direct_mode != USE_AUTH_MDS) { dout("not using auth, setting for that now"); req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; } else { - struct ceph_inode_info *ci = ceph_inode(req->r_inode); - struct ceph_cap *cap = NULL; - - if (req->r_session) - cap = ceph_get_cap_for_mds(ci, - req->r_session->s_mds); - - dout("already using auth"); - if ((!cap || cap != ci->i_auth_cap) || - (cap->mseq != req->r_sent_on_mseq)) { - dout("but cap changed, so resending"); + int mds = __choose_mds(mdsc, req); + if (mds >= 0 && mds != req->r_session->s_mds) { + dout("but auth changed, so resending"); __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; @@ -2169,7 +2479,6 @@ * useful we could do with a revised return value. */ dout("got safe reply %llu, mds%d\n", tid, mds); - list_del_init(&req->r_unsafe_item); /* last unsafe request during umount? */ if (mdsc->stopping && !__get_oldest_req(mdsc)) @@ -2180,6 +2489,14 @@ } else { req->r_got_unsafe = true; list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); + if (req->r_unsafe_dir) { + struct ceph_inode_info *ci = + ceph_inode(req->r_unsafe_dir); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_dir_item, + &ci->i_unsafe_dirops); + spin_unlock(&ci->i_unsafe_lock); + } } dout("handle_reply tid %lld result %d\n", tid, result); @@ -2195,11 +2512,13 @@ } /* snap trace */ + realm = NULL; if (rinfo->snapblob_len) { down_write(&mdsc->snap_rwsem); ceph_update_snap_trace(mdsc, rinfo->snapblob, - rinfo->snapblob + rinfo->snapblob_len, - le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); + rinfo->snapblob + rinfo->snapblob_len, + le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, + &realm); downgrade_write(&mdsc->snap_rwsem); } else { down_read(&mdsc->snap_rwsem); @@ -2210,22 +2529,29 @@ err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); if (err == 0) { if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || - req->r_op == CEPH_MDS_OP_LSSNAP) && - rinfo->dir_nr) + req->r_op == CEPH_MDS_OP_LSSNAP)) ceph_readdir_prepopulate(req, req->r_session); ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } mutex_unlock(&req->r_fill_mutex); up_read(&mdsc->snap_rwsem); + if (realm) + ceph_put_snap_realm(mdsc, realm); + + if (err == 0 && req->r_got_unsafe && req->r_target_inode) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); + spin_unlock(&ci->i_unsafe_lock); + } out_err: mutex_lock(&mdsc->mutex); if (!req->r_aborted) { if (err) { req->r_err = err; } else { - req->r_reply = msg; - ceph_msg_get(msg); + req->r_reply = ceph_msg_get(msg); req->r_got_result = true; } } else { @@ -2233,7 +2559,6 @@ } mutex_unlock(&mdsc->mutex); - ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -2282,6 +2607,7 @@ dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); BUG_ON(req->r_err); BUG_ON(req->r_got_result); + req->r_attempts = 0; req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; put_request_session(req); @@ -2326,7 +2652,7 @@ dout("handle_session mds%d %s %p state %s seq %llu\n", mds, ceph_session_op_name(op), session, - session_state_name(session->s_state), seq); + ceph_session_state_name(session->s_state), seq); if (session->s_state == CEPH_MDS_SESSION_HUNG) { session->s_state = CEPH_MDS_SESSION_OPEN; @@ -2352,10 +2678,10 @@ case CEPH_SESSION_CLOSE: if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) pr_info("mds%d reconnect denied\n", session->s_mds); + cleanup_session_requests(mdsc, session); remove_session_caps(session); - wake = 1; /* for good measure */ + wake = 2; /* for good measure */ wake_up_all(&mdsc->session_close_wq); - kick_requests(mdsc, mds); break; case CEPH_SESSION_STALE: @@ -2372,6 +2698,18 @@ trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); break; + case CEPH_SESSION_FLUSHMSG: + send_flushmsg_ack(mdsc, session, seq); + break; + + case CEPH_SESSION_FORCE_RO: + dout("force_session_readonly %p\n", session); + spin_lock(&session->s_cap_lock); + session->s_readonly = true; + spin_unlock(&session->s_cap_lock); + wake_up_session_caps(session, 0); + break; + default: pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); WARN_ON(1); @@ -2381,6 +2719,8 @@ if (wake) { mutex_lock(&mdsc->mutex); __wake_requests(mdsc, &session->s_waiting); + if (wake == 2) + kick_requests(mdsc, mds); mutex_unlock(&mdsc->mutex); } return; @@ -2400,18 +2740,42 @@ struct ceph_mds_session *session) { struct ceph_mds_request *req, *nreq; + struct rb_node *p; int err; dout("replay_unsafe_requests mds%d\n", session->s_mds); mutex_lock(&mdsc->mutex); list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { - err = __prepare_send_request(mdsc, req, session->s_mds); + err = __prepare_send_request(mdsc, req, session->s_mds, true); if (!err) { ceph_msg_get(req->r_request); ceph_con_send(&session->s_con, req->r_request); } } + + /* + * also re-send old requests when MDS enters reconnect stage. So that MDS + * can process completed request in clientreplay stage. + */ + p = rb_first(&mdsc->request_tree); + while (p) { + req = rb_entry(p, struct ceph_mds_request, r_node); + p = rb_next(p); + if (req->r_got_unsafe) + continue; + if (req->r_attempts == 0) + continue; /* only old requests */ + if (req->r_session && + req->r_session->s_mds == session->s_mds) { + err = __prepare_send_request(mdsc, req, + session->s_mds, true); + if (!err) { + ceph_msg_get(req->r_request); + ceph_con_send(&session->s_con, req->r_request); + } + } + } mutex_unlock(&mdsc->mutex); } @@ -2461,6 +2825,8 @@ spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ + cap->mseq = 0; /* and migrate_seq */ + cap->cap_gen = cap->session->s_cap_gen; if (recon_state->flock) { rec.v2.cap_id = cpu_to_le64(cap->cap_id); @@ -2488,20 +2854,16 @@ struct ceph_filelock *flocks; encode_again: - lock_flocks(); ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); - unlock_flocks(); flocks = kmalloc((num_fcntl_locks+num_flock_locks) * sizeof(struct ceph_filelock), GFP_NOFS); if (!flocks) { err = -ENOMEM; goto out_free; } - lock_flocks(); err = ceph_encode_locks_to_buffer(inode, flocks, num_fcntl_locks, num_flock_locks); - unlock_flocks(); if (err) { kfree(flocks); if (err == -ENOSPC) @@ -2523,6 +2885,8 @@ } else { err = ceph_pagelist_append(pagelist, &rec, reclen); } + + recon_state->nr_caps++; out_free: kfree(path); out_dput: @@ -2550,6 +2914,7 @@ struct rb_node *p; int mds = session->s_mds; int err = -ENOMEM; + int s_nr_caps; struct ceph_pagelist *pagelist; struct ceph_reconnect_state recon_state; @@ -2568,6 +2933,29 @@ session->s_state = CEPH_MDS_SESSION_RECONNECTING; session->s_seq = 0; + dout("session %p state %s\n", session, + ceph_session_state_name(session->s_state)); + + spin_lock(&session->s_gen_ttl_lock); + session->s_cap_gen++; + spin_unlock(&session->s_gen_ttl_lock); + + spin_lock(&session->s_cap_lock); + /* don't know if session is readonly */ + session->s_readonly = 0; + /* + * notify __ceph_remove_cap() that we are composing cap reconnect. + * If a cap get released before being added to the cap reconnect, + * __ceph_remove_cap() should skip queuing cap release. + */ + session->s_cap_reconnect = 1; + /* drop old cap expires; we're about to reestablish that state */ + cleanup_cap_releases(mdsc, session); + + /* trim unused caps to reduce MDS's cache rejoin time */ + if (mdsc->fsc->sb->s_root) + shrink_dcache_parent(mdsc->fsc->sb->s_root); + ceph_con_close(&session->s_con); ceph_con_open(&session->s_con, CEPH_ENTITY_TYPE_MDS, mds, @@ -2578,23 +2966,23 @@ down_read(&mdsc->snap_rwsem); - dout("session %p state %s\n", session, - session_state_name(session->s_state)); - - /* drop old cap expires; we're about to reestablish that state */ - discard_cap_releases(mdsc, session); - /* traverse this session's caps */ - err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); + s_nr_caps = session->s_nr_caps; + err = ceph_pagelist_encode_32(pagelist, s_nr_caps); if (err) goto fail; + recon_state.nr_caps = 0; recon_state.pagelist = pagelist; recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; err = iterate_session_caps(session, encode_caps_cb, &recon_state); if (err < 0) goto fail; + spin_lock(&session->s_cap_lock); + session->s_cap_reconnect = 0; + spin_unlock(&session->s_cap_lock); + /* * snaprealms. we provide mds with the ino, seq (version), and * parent for all of our realms. If the mds has any newer info, @@ -2617,11 +3005,21 @@ if (recon_state.flock) reply->hdr.version = cpu_to_le16(2); - if (pagelist->length) { - /* set up outbound data if we have any */ - reply->hdr.data_len = cpu_to_le32(pagelist->length); - ceph_msg_data_add_pagelist(reply, pagelist); + + /* raced with cap release? */ + if (s_nr_caps != recon_state.nr_caps) { + struct page *page = list_first_entry(&pagelist->head, + struct page, lru); + __le32 *addr = kmap_atomic(page); + *addr = cpu_to_le32(recon_state.nr_caps); + kunmap_atomic(addr); } + + reply->hdr.data_len = cpu_to_le32(pagelist->length); + ceph_msg_data_add_pagelist(reply, pagelist); + + ceph_early_kick_flushing_caps(mdsc, session); + ceph_con_send(&session->s_con, reply); mutex_unlock(&session->s_mutex); @@ -2639,7 +3037,6 @@ mutex_unlock(&session->s_mutex); fail_nomsg: ceph_pagelist_release(pagelist); - kfree(pagelist); fail_nopagelist: pr_err("error %d preparing reconnect for mds%d\n", err, mds); return; @@ -2675,7 +3072,7 @@ ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", ceph_mds_state_name(newstate), ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", - session_state_name(s->s_state)); + ceph_session_state_name(s->s_state)); if (i >= newmap->m_max_mds || memcmp(ceph_mdsmap_get_addr(oldmap, i), @@ -2695,9 +3092,6 @@ mutex_unlock(&s->s_mutex); s->s_state = CEPH_MDS_SESSION_RESTARTING; } - - /* kick any requests waiting on the recovering mds */ - kick_requests(mdsc, i); } else if (oldstate == newstate) { continue; /* nothing new with this mds */ } @@ -2787,14 +3181,15 @@ if (dname.len != get_unaligned_le32(h+1)) goto bad; - mutex_lock(&session->s_mutex); - session->s_seq++; - /* lookup inode */ inode = ceph_find_inode(sb, vino); dout("handle_lease %s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), vino.ino, inode, dname.len, dname.name); + + mutex_lock(&session->s_mutex); + session->s_seq++; + if (inode == NULL) { dout("handle_lease no inode %llx\n", vino.ino); goto release; @@ -2831,7 +3226,7 @@ di->lease_renew_from && di->lease_renew_after == 0) { unsigned long duration = - le32_to_cpu(h->duration_ms) * HZ / 1000; + msecs_to_jiffies(le32_to_cpu(h->duration_ms)); di->lease_seq = seq; dentry->d_time = di->lease_renew_from + duration; @@ -3021,7 +3416,6 @@ send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - ceph_add_cap_releases(mdsc, s); if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG) ceph_send_cap_releases(mdsc, s); @@ -3056,13 +3450,16 @@ init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->sessions = NULL; + atomic_set(&mdsc->num_sessions, 0); mdsc->max_sessions = 0; mdsc->stopping = 0; + mdsc->last_snap_seq = 0; init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; + mdsc->oldest_tid = 0; mdsc->request_tree = RB_ROOT; INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); mdsc->last_renew_caps = jiffies; @@ -3070,7 +3467,8 @@ spin_lock_init(&mdsc->cap_delay_lock); INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); - mdsc->cap_flush_seq = 0; + mdsc->last_cap_flush_tid = 1; + mdsc->cap_flush_tree = RB_ROOT; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; @@ -3082,6 +3480,9 @@ ceph_caps_init(mdsc); ceph_adjust_min_caps(mdsc, fsc->min_caps); + init_rwsem(&mdsc->pool_perm_rwsem); + mdsc->pool_perm_tree = RB_ROOT; + return 0; } @@ -3091,8 +3492,8 @@ */ static void wait_requests(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_request *req; - struct ceph_fs_client *fsc = mdsc->fsc; mutex_lock(&mdsc->mutex); if (__get_oldest_req(mdsc)) { @@ -3100,7 +3501,7 @@ dout("wait_requests waiting for requests\n"); wait_for_completion_timeout(&mdsc->safe_umount_waiters, - fsc->client->options->mount_timeout * HZ); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining requests */ mutex_lock(&mdsc->mutex); @@ -3153,7 +3554,8 @@ nextreq = rb_entry(n, struct ceph_mds_request, r_node); else nextreq = NULL; - if ((req->r_op & CEPH_MDS_OP_WRITE)) { + if (req->r_op != CEPH_MDS_OP_SETFILELOCK && + (req->r_op & CEPH_MDS_OP_WRITE)) { /* write op */ ceph_mdsc_get_request(req); if (nextreq) @@ -3181,22 +3583,30 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { - u64 want_tid, want_flush; + u64 want_tid, want_flush, want_snap; - if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) + if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return; dout("sync\n"); mutex_lock(&mdsc->mutex); want_tid = mdsc->last_tid; - want_flush = mdsc->cap_flush_seq; mutex_unlock(&mdsc->mutex); - dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); ceph_flush_dirty_caps(mdsc); + spin_lock(&mdsc->cap_dirty_lock); + want_flush = mdsc->last_cap_flush_tid; + spin_unlock(&mdsc->cap_dirty_lock); + + down_read(&mdsc->snap_rwsem); + want_snap = mdsc->last_snap_seq; + up_read(&mdsc->snap_rwsem); + + dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", + want_tid, want_flush, want_snap); wait_unsafe_requests(mdsc, want_tid); - wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); + wait_caps_flush(mdsc, want_flush, want_snap); } /* @@ -3204,17 +3614,9 @@ */ static bool done_closing_sessions(struct ceph_mds_client *mdsc) { - int i, n = 0; - - if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) + if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return true; - - mutex_lock(&mdsc->mutex); - for (i = 0; i < mdsc->max_sessions; i++) - if (mdsc->sessions[i]) - n++; - mutex_unlock(&mdsc->mutex); - return n == 0; + return atomic_read(&mdsc->num_sessions) == 0; } /* @@ -3222,10 +3624,9 @@ */ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_session *session; int i; - struct ceph_fs_client *fsc = mdsc->fsc; - unsigned long timeout = fsc->client->options->mount_timeout * HZ; dout("close_sessions\n"); @@ -3246,7 +3647,7 @@ dout("waiting for sessions to close\n"); wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), - timeout); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining sessions */ mutex_lock(&mdsc->mutex); @@ -3272,6 +3673,34 @@ dout("stopped\n"); } +void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) +{ + struct ceph_mds_session *session; + int mds; + + dout("force umount\n"); + + mutex_lock(&mdsc->mutex); + for (mds = 0; mds < mdsc->max_sessions; mds++) { + session = __ceph_lookup_mds_session(mdsc, mds); + if (!session) + continue; + mutex_unlock(&mdsc->mutex); + mutex_lock(&session->s_mutex); + __close_session(mdsc, session); + if (session->s_state == CEPH_MDS_SESSION_CLOSING) { + cleanup_session_requests(mdsc, session); + remove_session_caps(session); + } + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); + mutex_lock(&mdsc->mutex); + kick_requests(mdsc, mds); + } + __wake_requests(mdsc, &mdsc->waiting_for_map); + mutex_unlock(&mdsc->mutex); +} + static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) { dout("stop\n"); @@ -3280,6 +3709,7 @@ ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); ceph_caps_finalize(mdsc); + ceph_pool_perm_destroy(mdsc); } void ceph_mdsc_destroy(struct ceph_fs_client *fsc) @@ -3388,7 +3818,7 @@ struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; - pr_warning("mds%d closed our session\n", s->s_mds); + pr_warn("mds%d closed our session\n", s->s_mds); send_mds_reconnect(mdsc, s); } @@ -3514,6 +3944,22 @@ return msg; } +static int mds_sign_message(struct ceph_msg *msg) +{ + struct ceph_mds_session *s = msg->con->private; + struct ceph_auth_handshake *auth = &s->s_auth; + + return ceph_auth_sign_message(auth, msg); +} + +static int mds_check_message_signature(struct ceph_msg *msg) +{ + struct ceph_mds_session *s = msg->con->private; + struct ceph_auth_handshake *auth = &s->s_auth; + + return ceph_auth_check_message_signature(auth, msg); +} + static const struct ceph_connection_operations mds_con_ops = { .get = con_get, .put = con_put, @@ -3523,6 +3969,8 @@ .invalidate_authorizer = invalidate_authorizer, .peer_reset = peer_reset, .alloc_msg = mds_alloc_msg, + .sign_message = mds_sign_message, + .check_message_signature = mds_check_message_signature, }; /* eof */