--- zzzz-none-000/linux-3.10.107/fs/ocfs2/journal.c 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/fs/ocfs2/journal.c 2021-02-04 17:41:59.000000000 +0000 @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -49,6 +50,8 @@ #include "sysfile.h" #include "uptodate.h" #include "quota.h" +#include "file.h" +#include "namei.h" #include "buffer_head_io.h" #include "ocfs2_trace.h" @@ -68,13 +71,15 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb, int slot_num); static int ocfs2_recover_orphans(struct ocfs2_super *osb, - int slot); + int slot, + enum ocfs2_orphan_reco_type orphan_reco_type); static int ocfs2_commit_thread(void *arg); static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, int slot_num, struct ocfs2_dinode *la_dinode, struct ocfs2_dinode *tl_dinode, - struct ocfs2_quota_recovery *qrec); + struct ocfs2_quota_recovery *qrec, + enum ocfs2_orphan_reco_type orphan_reco_type); static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb) { @@ -103,7 +108,7 @@ unsigned char rm_replay_slots[0]; }; -void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state) +static void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state) { if (!osb->replay_map) return; @@ -148,7 +153,8 @@ return 0; } -void ocfs2_queue_replay_slots(struct ocfs2_super *osb) +static void ocfs2_queue_replay_slots(struct ocfs2_super *osb, + enum ocfs2_orphan_reco_type orphan_reco_type) { struct ocfs2_replay_map *replay_map = osb->replay_map; int i; @@ -162,11 +168,12 @@ for (i = 0; i < replay_map->rm_slots; i++) if (replay_map->rm_replay_slots[i]) ocfs2_queue_recovery_completion(osb->journal, i, NULL, - NULL, NULL); + NULL, NULL, + orphan_reco_type); replay_map->rm_state = REPLAY_DONE; } -void ocfs2_free_replay_slots(struct ocfs2_super *osb) +static void ocfs2_free_replay_slots(struct ocfs2_super *osb) { struct ocfs2_replay_map *replay_map = osb->replay_map; @@ -367,7 +374,7 @@ mlog_errno(PTR_ERR(handle)); if (is_journal_aborted(journal)) { - ocfs2_abort(osb->sb, "Detected aborted journal"); + ocfs2_abort(osb->sb, "Detected aborted journal\n"); handle = ERR_PTR(-EROFS); } } else { @@ -455,6 +462,41 @@ return status; } +/* + * If we have fewer than thresh credits, extend by OCFS2_MAX_TRANS_DATA. + * If that fails, restart the transaction & regain write access for the + * buffer head which is used for metadata modifications. + * Taken from Ext4: extend_or_restart_transaction() + */ +int ocfs2_allocate_extend_trans(handle_t *handle, int thresh) +{ + int status, old_nblks; + + BUG_ON(!handle); + + old_nblks = handle->h_buffer_credits; + trace_ocfs2_allocate_extend_trans(old_nblks, thresh); + + if (old_nblks < thresh) + return 0; + + status = jbd2_journal_extend(handle, OCFS2_MAX_TRANS_DATA); + if (status < 0) { + mlog_errno(status); + goto bail; + } + + if (status > 0) { + status = jbd2_journal_restart(handle, OCFS2_MAX_TRANS_DATA); + if (status < 0) + mlog_errno(status); + } + +bail: + return status; +} + + struct ocfs2_triggers { struct jbd2_buffer_trigger_type ot_triggers; int ot_offset; @@ -529,9 +571,7 @@ (unsigned long)bh, (unsigned long long)bh->b_blocknr); - /* We aren't guaranteed to have the superblock here - but if we - * don't, it'll just crash. */ - ocfs2_error(bh->b_assoc_map->host->i_sb, + ocfs2_error(bh->b_bdev->bd_super, "JBD2 has aborted our journal, ocfs2 cannot continue\n"); } @@ -628,7 +668,23 @@ mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n"); mlog(ML_ERROR, "b_blocknr=%llu\n", (unsigned long long)bh->b_blocknr); - BUG(); + + lock_buffer(bh); + /* + * A previous attempt to write this buffer head failed. + * Nothing we can do but to retry the write and hope for + * the best. + */ + if (buffer_write_io_error(bh) && !buffer_uptodate(bh)) { + clear_buffer_write_io_error(bh); + set_buffer_uptodate(bh); + } + + if (!buffer_uptodate(bh)) { + unlock_buffer(bh); + return -EIO; + } + unlock_buffer(bh); } /* Set the current transaction information on the ci so @@ -733,7 +789,20 @@ trace_ocfs2_journal_dirty((unsigned long long)bh->b_blocknr); status = jbd2_journal_dirty_metadata(handle, bh); - BUG_ON(status); + if (status) { + mlog_errno(status); + if (!is_handle_aborted(handle)) { + journal_t *journal = handle->h_transaction->t_journal; + struct super_block *sb = bh->b_bdev->bd_super; + + mlog(ML_ERROR, "jbd2_journal_dirty_metadata failed. " + "Aborting transaction and journal.\n"); + handle->h_err = status; + jbd2_journal_abort_handle(handle); + jbd2_journal_abort(journal, status); + ocfs2_abort(sb, "Journal already aborted.\n"); + } + } } #define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE) @@ -801,14 +870,14 @@ inode_lock = 1; di = (struct ocfs2_dinode *)bh->b_data; - if (inode->i_size < OCFS2_MIN_JOURNAL_SIZE) { + if (i_size_read(inode) < OCFS2_MIN_JOURNAL_SIZE) { mlog(ML_ERROR, "Journal file size (%lld) is too small!\n", - inode->i_size); + i_size_read(inode)); status = -EINVAL; goto done; } - trace_ocfs2_journal_init(inode->i_size, + trace_ocfs2_journal_init(i_size_read(inode), (unsigned long long)inode->i_blocks, OCFS2_I(inode)->ip_clusters); @@ -1021,7 +1090,7 @@ /* Launch the commit thread */ if (!local) { osb->commit_task = kthread_run(ocfs2_commit_thread, osb, - "ocfs2cmt"); + "ocfs2cmt-%s", osb->uuid_str); if (IS_ERR(osb->commit_task)) { status = PTR_ERR(osb->commit_task); osb->commit_task = NULL; @@ -1096,7 +1165,7 @@ memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL); - num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, inode->i_size); + num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode)); v_blkno = 0; while (v_blkno < num_blocks) { status = ocfs2_extent_map_get_blocks(inode, v_blkno, @@ -1138,6 +1207,7 @@ struct ocfs2_dinode *lri_la_dinode; struct ocfs2_dinode *lri_tl_dinode; struct ocfs2_quota_recovery *lri_qrec; + enum ocfs2_orphan_reco_type lri_orphan_reco_type; }; /* Does the second half of the recovery process. By this point, the @@ -1159,6 +1229,7 @@ struct ocfs2_dinode *la_dinode, *tl_dinode; struct ocfs2_la_recovery_item *item, *n; struct ocfs2_quota_recovery *qrec; + enum ocfs2_orphan_reco_type orphan_reco_type; LIST_HEAD(tmp_la_list); trace_ocfs2_complete_recovery( @@ -1176,6 +1247,7 @@ la_dinode = item->lri_la_dinode; tl_dinode = item->lri_tl_dinode; qrec = item->lri_qrec; + orphan_reco_type = item->lri_orphan_reco_type; trace_ocfs2_complete_recovery_slot(item->lri_slot, la_dinode ? le64_to_cpu(la_dinode->i_blkno) : 0, @@ -1200,7 +1272,8 @@ kfree(tl_dinode); } - ret = ocfs2_recover_orphans(osb, item->lri_slot); + ret = ocfs2_recover_orphans(osb, item->lri_slot, + orphan_reco_type); if (ret < 0) mlog_errno(ret); @@ -1225,7 +1298,8 @@ int slot_num, struct ocfs2_dinode *la_dinode, struct ocfs2_dinode *tl_dinode, - struct ocfs2_quota_recovery *qrec) + struct ocfs2_quota_recovery *qrec, + enum ocfs2_orphan_reco_type orphan_reco_type) { struct ocfs2_la_recovery_item *item; @@ -1249,6 +1323,7 @@ item->lri_slot = slot_num; item->lri_tl_dinode = tl_dinode; item->lri_qrec = qrec; + item->lri_orphan_reco_type = orphan_reco_type; spin_lock(&journal->j_lock); list_add_tail(&item->lri_list, &journal->j_la_cleanups); @@ -1268,7 +1343,8 @@ /* No need to queue up our truncate_log as regular cleanup will catch * that */ ocfs2_queue_recovery_completion(journal, osb->slot_num, - osb->local_alloc_copy, NULL, NULL); + osb->local_alloc_copy, NULL, NULL, + ORPHAN_NEED_TRUNCATE); ocfs2_schedule_truncate_log_flush(osb, 0); osb->local_alloc_copy = NULL; @@ -1276,7 +1352,7 @@ /* queue to recover orphan slots for all offline slots */ ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); - ocfs2_queue_replay_slots(osb); + ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE); ocfs2_free_replay_slots(osb); } @@ -1287,7 +1363,8 @@ osb->slot_num, NULL, NULL, - osb->quota_rec); + osb->quota_rec, + ORPHAN_NEED_TRUNCATE); osb->quota_rec = NULL; } } @@ -1324,7 +1401,7 @@ /* queue recovery for our own slot */ ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, - NULL, NULL); + NULL, NULL, ORPHAN_NO_NEED_TRUNCATE); spin_lock(&osb->osb_lock); while (rm->rm_used) { @@ -1383,13 +1460,14 @@ continue; } ocfs2_queue_recovery_completion(osb->journal, rm_quota[i], - NULL, NULL, qrec); + NULL, NULL, qrec, + ORPHAN_NEED_TRUNCATE); } ocfs2_super_unlock(osb, 1); /* queue recovery for offline slots */ - ocfs2_queue_replay_slots(osb); + ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE); bail: mutex_lock(&osb->recovery_lock); @@ -1411,7 +1489,6 @@ * requires that we call do_exit(). And it isn't exported, but * complete_and_exit() seems to be a minimal wrapper around it. */ complete_and_exit(NULL, status); - return status; } void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num) @@ -1430,7 +1507,7 @@ goto out; osb->recovery_thread_task = kthread_run(__ocfs2_recovery_thread, osb, - "ocfs2rec"); + "ocfs2rec-%s", osb->uuid_str); if (IS_ERR(osb->recovery_thread_task)) { mlog_errno((int)PTR_ERR(osb->recovery_thread_task)); osb->recovery_thread_task = NULL; @@ -1676,7 +1753,7 @@ /* This will kfree the memory pointed to by la_copy and tl_copy */ ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, - tl_copy, NULL); + tl_copy, NULL, ORPHAN_NEED_TRUNCATE); status = 0; done: @@ -1834,7 +1911,7 @@ * hasn't happened. The node queues a scan and increments the * sequence number in the LVB. */ -void ocfs2_queue_orphan_scan(struct ocfs2_super *osb) +static void ocfs2_queue_orphan_scan(struct ocfs2_super *osb) { struct ocfs2_orphan_scan *os; int status, i; @@ -1866,7 +1943,7 @@ for (i = 0; i < osb->max_slots; i++) ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL, - NULL); + NULL, ORPHAN_NO_NEED_TRUNCATE); /* * We queued a recovery on orphan slots, increment the sequence * number and update LVB so other node will skip the scan for a while @@ -1883,7 +1960,7 @@ } /* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */ -void ocfs2_orphan_scan_work(struct work_struct *work) +static void ocfs2_orphan_scan_work(struct work_struct *work) { struct ocfs2_orphan_scan *os; struct ocfs2_super *osb; @@ -1941,14 +2018,18 @@ } struct ocfs2_orphan_filldir_priv { + struct dir_context ctx; struct inode *head; struct ocfs2_super *osb; + enum ocfs2_orphan_reco_type orphan_reco_type; }; -static int ocfs2_orphan_filldir(void *priv, const char *name, int name_len, - loff_t pos, u64 ino, unsigned type) +static int ocfs2_orphan_filldir(struct dir_context *ctx, const char *name, + int name_len, loff_t pos, u64 ino, + unsigned type) { - struct ocfs2_orphan_filldir_priv *p = priv; + struct ocfs2_orphan_filldir_priv *p = + container_of(ctx, struct ocfs2_orphan_filldir_priv, ctx); struct inode *iter; if (name_len == 1 && !strncmp(".", name, 1)) @@ -1956,12 +2037,29 @@ if (name_len == 2 && !strncmp("..", name, 2)) return 0; + /* do not include dio entry in case of orphan scan */ + if ((p->orphan_reco_type == ORPHAN_NO_NEED_TRUNCATE) && + (!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX, + OCFS2_DIO_ORPHAN_PREFIX_LEN))) + return 0; + /* Skip bad inodes so that recovery can continue */ iter = ocfs2_iget(p->osb, ino, OCFS2_FI_FLAG_ORPHAN_RECOVERY, 0); if (IS_ERR(iter)) return 0; + if (!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX, + OCFS2_DIO_ORPHAN_PREFIX_LEN)) + OCFS2_I(iter)->ip_flags |= OCFS2_INODE_DIO_ORPHAN_ENTRY; + + /* Skip inodes which are already added to recover list, since dio may + * happen concurrently with unlink/rename */ + if (OCFS2_I(iter)->ip_next_orphan) { + iput(iter); + return 0; + } + trace_ocfs2_orphan_filldir((unsigned long long)OCFS2_I(iter)->ip_blkno); /* No locking is required for the next_orphan queue as there * is only ever a single process doing orphan recovery. */ @@ -1973,15 +2071,17 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb, int slot, - struct inode **head) + struct inode **head, + enum ocfs2_orphan_reco_type orphan_reco_type) { int status; struct inode *orphan_dir_inode = NULL; - struct ocfs2_orphan_filldir_priv priv; - loff_t pos = 0; - - priv.osb = osb; - priv.head = *head; + struct ocfs2_orphan_filldir_priv priv = { + .ctx.actor = ocfs2_orphan_filldir, + .osb = osb, + .head = *head, + .orphan_reco_type = orphan_reco_type + }; orphan_dir_inode = ocfs2_get_system_file_inode(osb, ORPHAN_DIR_SYSTEM_INODE, @@ -1999,8 +2099,7 @@ goto out; } - status = ocfs2_dir_foreach(orphan_dir_inode, &pos, &priv, - ocfs2_orphan_filldir); + status = ocfs2_dir_foreach(orphan_dir_inode, &priv.ctx); if (status) { mlog_errno(status); goto out_cluster; @@ -2071,17 +2170,20 @@ * advertising our state to ocfs2_delete_inode(). */ static int ocfs2_recover_orphans(struct ocfs2_super *osb, - int slot) + int slot, + enum ocfs2_orphan_reco_type orphan_reco_type) { int ret = 0; struct inode *inode = NULL; struct inode *iter; struct ocfs2_inode_info *oi; + struct buffer_head *di_bh = NULL; + struct ocfs2_dinode *di = NULL; trace_ocfs2_recover_orphans(slot); ocfs2_mark_recovering_orphan_dir(osb, slot); - ret = ocfs2_queue_orphans(osb, slot, &inode); + ret = ocfs2_queue_orphans(osb, slot, &inode, orphan_reco_type); ocfs2_clear_recovering_orphan_dir(osb, slot); /* Error here should be noted, but we want to continue with as @@ -2095,21 +2197,61 @@ (unsigned long long)oi->ip_blkno); iter = oi->ip_next_orphan; + oi->ip_next_orphan = NULL; - spin_lock(&oi->ip_lock); - /* The remote delete code may have set these on the - * assumption that the other node would wipe them - * successfully. If they are still in the node's - * orphan dir, we need to reset that state. */ - oi->ip_flags &= ~(OCFS2_INODE_DELETED|OCFS2_INODE_SKIP_DELETE); - - /* Set the proper information to get us going into - * ocfs2_delete_inode. */ - oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; - spin_unlock(&oi->ip_lock); + if (oi->ip_flags & OCFS2_INODE_DIO_ORPHAN_ENTRY) { + mutex_lock(&inode->i_mutex); + ret = ocfs2_rw_lock(inode, 1); + if (ret < 0) { + mlog_errno(ret); + goto unlock_mutex; + } + /* + * We need to take and drop the inode lock to + * force read inode from disk. + */ + ret = ocfs2_inode_lock(inode, &di_bh, 1); + if (ret) { + mlog_errno(ret); + goto unlock_rw; + } - iput(inode); + di = (struct ocfs2_dinode *)di_bh->b_data; + + if (di->i_flags & cpu_to_le32(OCFS2_DIO_ORPHANED_FL)) { + ret = ocfs2_truncate_file(inode, di_bh, + i_size_read(inode)); + if (ret < 0) { + if (ret != -ENOSPC) + mlog_errno(ret); + goto unlock_inode; + } + + ret = ocfs2_del_inode_from_orphan(osb, inode, + di_bh, 0, 0); + if (ret) + mlog_errno(ret); + } +unlock_inode: + ocfs2_inode_unlock(inode, 1); + brelse(di_bh); + di_bh = NULL; +unlock_rw: + ocfs2_rw_unlock(inode, 1); +unlock_mutex: + mutex_unlock(&inode->i_mutex); + + /* clear dio flag in ocfs2_inode_info */ + oi->ip_flags &= ~OCFS2_INODE_DIO_ORPHAN_ENTRY; + } else { + spin_lock(&oi->ip_lock); + /* Set the proper information to get us going into + * ocfs2_delete_inode. */ + oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; + spin_unlock(&oi->ip_lock); + } + iput(inode); inode = iter; } @@ -2156,8 +2298,20 @@ || kthread_should_stop()); status = ocfs2_commit_cache(osb); - if (status < 0) - mlog_errno(status); + if (status < 0) { + static unsigned long abort_warn_time; + + /* Warn about this once per minute */ + if (printk_timed_ratelimit(&abort_warn_time, 60*HZ)) + mlog(ML_ERROR, "status = %d, journal is " + "already aborted.\n", status); + /* + * After ocfs2_commit_cache() fails, j_num_trans has a + * non-zero value. Sleep here to avoid a busy-wait + * loop. + */ + msleep_interruptible(1000); + } if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){ mlog(ML_KTHREAD,