/*-
* Copyright (c) 2003, 2008, 2009 The NetBSD Foundation, Inc.
* All rights reserved.
*
* This code is derived from software contributed to The NetBSD Foundation
* by Wasabi Systems, Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* This implements file system independent write ahead filesystem logging.
*/
static int wapbl_flush_disk_cache = 1;
static int wapbl_verbose_commit = 0;
static int wapbl_allow_dpofua = 0; /* switched off by default for now */
static int wapbl_journal_iobufs = 4;
/*
* This structure holds per-mount log information.
*
* Legend: a = atomic access only
* r = read-only after init
* l = rwlock held
* m = mutex held
* lm = rwlock held writing or mutex held
* u = unlocked access ok
* b = bufcache_lock held
*/
LIST_HEAD(wapbl_ino_head, wapbl_ino);
struct wapbl {
struct vnode *wl_logvp; /* r: log here */
struct vnode *wl_devvp; /* r: log on this device */
struct mount *wl_mount; /* r: mountpoint wl is associated with */
daddr_t wl_logpbn; /* r: Physical block number of start of log */
int wl_log_dev_bshift; /* r: logarithm of device block size of log
device */
int wl_fs_dev_bshift; /* r: logarithm of device block size of
filesystem device */
unsigned wl_lock_count; /* m: Count of transactions in progress */
size_t wl_circ_size; /* r: Number of bytes in buffer of log */
size_t wl_circ_off; /* r: Number of bytes reserved at start */
size_t wl_bufcount_max; /* r: Number of buffers reserved for log */
size_t wl_bufbytes_max; /* r: Number of buf bytes reserved for log */
off_t wl_head; /* l: Byte offset of log head */
off_t wl_tail; /* l: Byte offset of log tail */
/*
* WAPBL log layout, stored on wl_devvp at wl_logpbn:
*
* ___________________ wl_circ_size __________________
* / \
* +---------+---------+-------+--------------+--------+
* [ commit0 | commit1 | CCWCW | EEEEEEEEEEEE | CCCWCW ]
* +---------+---------+-------+--------------+--------+
* wl_circ_off --^ ^-- wl_head ^-- wl_tail
*
* commit0 and commit1 are commit headers. A commit header has
* a generation number, indicating which of the two headers is
* more recent, and an assignment of head and tail pointers.
* The rest is a circular queue of log records, starting at
* the byte offset wl_circ_off.
*
* E marks empty space for records.
* W marks records for block writes issued but waiting.
* C marks completed records.
*
* wapbl_flush writes new records to empty `E' spaces after
* wl_head from the current transaction in memory.
*
* wapbl_truncate advances wl_tail past any completed `C'
* records, freeing them up for use.
*
* head == tail == 0 means log is empty.
* head == tail != 0 means log is full.
*
* See assertions in wapbl_advance() for other boundary
* conditions.
*
* Only wapbl_flush moves the head, except when wapbl_truncate
* sets it to 0 to indicate that the log is empty.
*
* Only wapbl_truncate moves the tail, except when wapbl_flush
* sets it to wl_circ_off to indicate that the log is full.
*/
struct wapbl_wc_header *wl_wc_header; /* l */
void *wl_wc_scratch; /* l: scratch space (XXX: por que?!?) */
/*
* Must be held while accessing
* wl_count or wl_bufs or head or tail
*/
#if _KERNEL
/*
* Callback called from within the flush routine to flush any extra
* bits. Note that flush may be skipped without calling this if
* there are no outstanding buffers in the transaction.
*/
wapbl_flush_fn_t wl_flush; /* r */
wapbl_flush_fn_t wl_flush_abort;/* r */
/* Event counters */
char wl_ev_group[EVCNT_STRING_MAX]; /* r */
struct evcnt wl_ev_commit; /* l */
struct evcnt wl_ev_journalwrite; /* l */
struct evcnt wl_ev_jbufs_bio_nowait; /* l */
struct evcnt wl_ev_metawrite; /* lm */
struct evcnt wl_ev_cacheflush; /* l */
#endif
size_t wl_bufbytes; /* m: Byte count of pages in wl_bufs */
size_t wl_bufcount; /* m: Count of buffers in wl_bufs */
size_t wl_bcount; /* m: Total bcount of wl_bufs */
TAILQ_HEAD(, buf) wl_bufs; /* m: Buffers in current transaction */
kcondvar_t wl_reclaimable_cv; /* m (obviously) */
size_t wl_reclaimable_bytes; /* m: Amount of space available for
reclamation by truncate */
int wl_error_count; /* m: # of wl_entries with errors */
size_t wl_reserved_bytes; /* never truncate log smaller than this */
static void
wapbl_dkcache_init(struct wapbl *wl)
{
int error;
/* Get disk cache flags */
error = VOP_IOCTL(wl->wl_devvp, DIOCGCACHE, &wl->wl_dkcache,
FWRITE, FSCRED);
if (error) {
/* behave as if there was a write cache */
wl->wl_dkcache = DKCACHE_WRITE;
}
/* Use FUA instead of cache flush if available */
if (ISSET(wl->wl_dkcache, DKCACHE_FUA))
wl->wl_jwrite_flags |= B_MEDIA_FUA;
/* Use DPO for journal writes if available */
if (ISSET(wl->wl_dkcache, DKCACHE_DPO))
wl->wl_jwrite_flags |= B_MEDIA_DPO;
}
static int
wapbl_start_flush_inodes(struct wapbl *wl, struct wapbl_replay *wr)
{
int error, i;
WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
("wapbl_start: reusing log with %d inodes\n", wr->wr_inodescnt));
/*
* Its only valid to reuse the replay log if its
* the same as the new log we just opened.
*/
KDASSERT(!wapbl_replay_isopen(wr));
KASSERT(wl->wl_devvp->v_type == VBLK);
KASSERT(wr->wr_devvp->v_type == VBLK);
KASSERT(wl->wl_devvp->v_rdev == wr->wr_devvp->v_rdev);
KASSERT(wl->wl_logpbn == wr->wr_logpbn);
KASSERT(wl->wl_circ_size == wr->wr_circ_size);
KASSERT(wl->wl_circ_off == wr->wr_circ_off);
KASSERT(wl->wl_log_dev_bshift == wr->wr_log_dev_bshift);
KASSERT(wl->wl_fs_dev_bshift == wr->wr_fs_dev_bshift);
for (i = 0; i < wr->wr_inodescnt; i++)
wapbl_register_inode(wl, wr->wr_inodes[i].wr_inumber,
wr->wr_inodes[i].wr_imode);
/* Make sure new transaction won't overwrite old inodes list */
KDASSERT(wapbl_transaction_len(wl) <=
wapbl_space_free(wl->wl_circ_size, wr->wr_inodeshead,
wr->wr_inodestail));
if (log_dev_bshift > fs_dev_bshift) {
WAPBL_PRINTF(WAPBL_PRINT_OPEN,
("wapbl: log device's block size cannot be larger "
"than filesystem's\n"));
/*
* Not currently implemented, although it could be if
* needed someday.
*/
return SET_ERROR(ENOSYS);
}
if (off < 0)
return SET_ERROR(EINVAL);
if (blksize < DEV_BSIZE)
return SET_ERROR(EINVAL);
if (blksize % DEV_BSIZE)
return SET_ERROR(EINVAL);
/* XXXTODO: verify that the full load is writable */
/*
* XXX check for minimum log size
* minimum is governed by minimum amount of space
* to complete a transaction. (probably truncate)
*/
/* XXX for now pick something minimal */
if ((count * blksize) < MAXPHYS) {
return SET_ERROR(ENOSPC);
}
/* Reserve two log device blocks for the commit headers */
wl->wl_circ_off = 2<<wl->wl_log_dev_bshift;
wl->wl_circ_size = ((count * blksize) - wl->wl_circ_off);
/* truncate the log usage to a multiple of log_dev_bshift */
wl->wl_circ_size >>= wl->wl_log_dev_bshift;
wl->wl_circ_size <<= wl->wl_log_dev_bshift;
/*
* wl_bufbytes_max limits the size of the in memory transaction space.
* - Since buffers are allocated and accounted for in units of
* PAGE_SIZE it is required to be a multiple of PAGE_SIZE
* (i.e. 1<<PAGE_SHIFT)
* - Since the log device has to be written in units of
* 1<<wl_log_dev_bshift it is required to be a multiple of
* 1<<wl_log_dev_bshift.
* - Since filesystem will provide data in units of 1<<wl_fs_dev_bshift,
* it is convenient to be a multiple of 1<<wl_fs_dev_bshift.
* Therefore it must be multiple of the least common multiple of those
* three quantities. Fortunately, all of those quantities are
* guaranteed to be a power of two, and the least common multiple of
* a set of numbers which are all powers of two is simply the maximum
* of those numbers. Finally, the maximum logarithm of a power of two
* is the same as the log of the maximum power of two. So we can do
* the following operations to size wl_bufbytes_max:
*/
/* XXX fix actual number of pages reserved per filesystem. */
wl->wl_bufbytes_max = MIN(wl->wl_circ_size, buf_memcalc() / 2);
/* Round wl_bufbytes_max to the largest power of two constraint */
wl->wl_bufbytes_max >>= PAGE_SHIFT;
wl->wl_bufbytes_max <<= PAGE_SHIFT;
wl->wl_bufbytes_max >>= wl->wl_log_dev_bshift;
wl->wl_bufbytes_max <<= wl->wl_log_dev_bshift;
wl->wl_bufbytes_max >>= wl->wl_fs_dev_bshift;
wl->wl_bufbytes_max <<= wl->wl_fs_dev_bshift;
/* XXX maybe use filesystem fragment size instead of 1024 */
/* XXX fix actual number of buffers reserved per filesystem. */
wl->wl_bufcount_max = (buf_nbuf() / 2) * 1024;
/*
* if there was an existing set of unlinked but
* allocated inodes, preserve it in the new
* log.
*/
if (wr && wr->wr_inodescnt) {
error = wapbl_start_flush_inodes(wl, wr);
if (error)
goto errout;
}
/*
* XXX we may consider using upgrade here
* if we want to call flush from inside a transaction
*/
rw_enter(&wl->wl_rwlock, RW_WRITER);
wl->wl_flush(wl->wl_mount, TAILQ_FIRST(&wl->wl_dealloclist));
/*
* clean buffer list
*/
mutex_enter(&bufcache_lock);
mutex_enter(&wl->wl_mtx);
while ((bp = TAILQ_FIRST(&wl->wl_bufs)) != NULL) {
if (bbusy(bp, 0, 0, &wl->wl_mtx) == 0) {
KASSERT(bp->b_flags & B_LOCKED);
KASSERT(bp->b_oflags & BO_DELWRI);
/*
* Buffer is already on BQ_LOCKED queue.
* The buffer will be unlocked and
* removed from the transaction in brelsel()
*/
mutex_exit(&wl->wl_mtx);
bremfree(bp);
brelsel(bp, BC_INVAL);
mutex_enter(&wl->wl_mtx);
}
}
/*
* Remove references to this wl from wl_entries, free any which
* no longer have buffers, others will be freed in wapbl_biodone()
* when they no longer have any buffers.
*/
while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) != NULL) {
SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
/* XXX should we be accumulating wl_error_count
* and increasing reclaimable bytes ? */
we->we_wapbl = NULL;
if (we->we_bufcount == 0) {
#ifdef WAPBL_DEBUG_BUFBYTES
KASSERT(we->we_unsynced_bufbytes == 0);
#endif
pool_put(&wapbl_entry_pool, we);
}
}
/*
* wapbl_buffered_flush(wl)
*
* Flush any buffered writes from wapbl_buffered_write.
*/
static int
wapbl_buffered_flush(struct wapbl *wl, bool full)
{
int error = 0;
struct buf *bp, *bnext;
bool only_done = true, found = false;
/* if there is outstanding buffered write, send it now */
if ((bp = TAILQ_FIRST(&wl->wl_iobufs)) && bp->b_resid > 0)
wapbl_buffered_write_async(wl, bp);
/* wait for I/O to complete */
again:
TAILQ_FOREACH_SAFE(bp, &wl->wl_iobufs_busy, b_wapbllist, bnext) {
if (!full && only_done) {
/* skip unfinished */
if (!ISSET(bp->b_oflags, BO_DONE))
continue;
}
if (ISSET(bp->b_oflags, BO_DONE))
wl->wl_ev_jbufs_bio_nowait.ev_count++;
/*
* wapbl_buffered_write(data, len, wl, pbn)
*
* Write len bytes from data to physical block pbn on
* wl->wl_devvp. The write may not complete until
* wapbl_buffered_flush.
*/
static int
wapbl_buffered_write(void *data, size_t len, struct wapbl *wl, daddr_t pbn,
int bflags)
{
size_t resid;
struct buf *bp;
again:
bp = TAILQ_FIRST(&wl->wl_iobufs);
if (bp == NULL) {
/* No more buffers, wait for any previous I/O to finish. */
wapbl_buffered_flush(wl, false);
bp = TAILQ_FIRST(&wl->wl_iobufs);
KASSERT(bp != NULL);
}
/*
* If not adjacent to buffered data flush first. Disk block
* address is always valid for non-empty buffer.
*/
if ((bp->b_resid > 0 && pbn != bp->b_blkno + btodb(bp->b_resid))) {
wapbl_buffered_write_async(wl, bp);
goto again;
}
/*
* If this write goes to an empty buffer we have to
* save the disk block address first.
*/
if (bp->b_blkno == 0) {
bp->b_blkno = pbn;
bp->b_flags |= bflags;
}
/*
* Remaining space so this buffer ends on a buffer size boundary.
*
* Cannot become less or equal zero as the buffer would have been
* flushed on the last call then.
*/
resid = bp->b_bufsize - dbtob(bp->b_blkno % btodb(bp->b_bufsize)) -
bp->b_resid;
KASSERT(resid > 0);
KASSERT(dbtob(btodb(resid)) == resid);
if (len >= resid) {
/* Just filled the buf, or data did not fit */
wapbl_buffered_write_async(wl, bp);
data = (uint8_t *)data + resid;
len -= resid;
pbn += btodb(resid);
if (len > 0)
goto again;
}
return 0;
}
/*
* wapbl_circ_write(wl, data, len, offp)
*
* Write len bytes from data to the circular queue of wl, starting
* at linear byte offset *offp, and returning the new linear byte
* offset in *offp.
*
* If the starting linear byte offset precedes wl->wl_circ_off,
* the write instead begins at wl->wl_circ_off. XXX WTF? This
* should be a KASSERT, not a conditional.
*
* The write is buffered in wl and must be flushed with
* wapbl_buffered_flush before it will be submitted to the disk.
*/
static int
wapbl_circ_write(struct wapbl *wl, void *data, size_t len, off_t *offp)
{
size_t slen;
off_t off = *offp;
int error;
daddr_t pbn;
/*
* XXX this could be handled more gracefully, perhaps place
* only a partial transaction in the log and allow the
* remaining to flush without the protection of the journal.
*/
KASSERTMSG((wapbl_transaction_len(wl) <=
(wl->wl_circ_size - wl->wl_reserved_bytes)),
"wapbl_end: current transaction too big to flush");
#if 0
/*
* XXX this might be an issue for swapfiles.
* see uvm_swap.c:1702
*
* XXX2 why require it then? leap of semantics?
*/
KASSERT((bp->b_cflags & BC_NOCACHE) == 0);
#endif
#if 0
/*
* XXX this might be an issue for swapfiles.
* see uvm_swap.c:1725
*
* XXXdeux: see above
*/
KASSERT((bp->b_flags & BC_NOCACHE) == 0);
#endif
KASSERT(bp->b_flags & B_LOCKED);
void
wapbl_resize_buf(struct wapbl *wl, struct buf *bp, long oldsz, long oldcnt)
{
KASSERT(bp->b_cflags & BC_BUSY);
/*
* XXX: why does this depend on B_LOCKED? otherwise the buf
* is not for a transaction? if so, why is this called in the
* first place?
*/
if (bp->b_flags & B_LOCKED) {
mutex_enter(&wl->wl_mtx);
wl->wl_bufbytes += bp->b_bufsize - oldsz;
wl->wl_bcount += bp->b_bcount - oldcnt;
mutex_exit(&wl->wl_mtx);
}
}
#endif /* _KERNEL */
/****************************************************************/
/* Some utility inlines */
/*
* wapbl_space_used(avail, head, tail)
*
* Number of bytes used in a circular queue of avail total bytes,
* from tail to head.
*/
static inline size_t
wapbl_space_used(size_t avail, off_t head, off_t tail)
{
/*
* wapbl_space_free(avail, head, tail)
*
* Number of bytes free in a circular queue of avail total bytes,
* in which everything from tail to head is used.
*/
static inline size_t
wapbl_space_free(size_t avail, off_t head, off_t tail)
{
/*
* wapbl_advance_head(size, off, delta, headp, tailp)
*
* In a circular queue of size bytes starting at off, given the
* old head and tail offsets *headp and *tailp, store the new head
* and tail offsets in *headp and *tailp resulting from adding
* delta bytes of data to the head.
*/
static inline void
wapbl_advance_head(size_t size, size_t off, size_t delta, off_t *headp,
off_t *tailp)
{
off_t head = *headp;
off_t tail = *tailp;
KASSERT(delta <= wapbl_space_free(size, head, tail));
head = wapbl_advance(size, off, head, delta);
if (tail == 0 && head != 0)
tail = off;
*headp = head;
*tailp = tail;
}
/*
* wapbl_advance_tail(size, off, delta, headp, tailp)
*
* In a circular queue of size bytes starting at off, given the
* old head and tail offsets *headp and *tailp, store the new head
* and tail offsets in *headp and *tailp resulting from removing
* delta bytes of data from the tail.
*/
static inline void
wapbl_advance_tail(size_t size, size_t off, size_t delta, off_t *headp,
off_t *tailp)
{
off_t head = *headp;
off_t tail = *tailp;
/*
* wapbl_truncate(wl, minfree)
*
* Wait until at least minfree bytes are available in the log.
*
* If it was necessary to wait for writes to complete,
* advance the circular queue tail to reflect the new write
* completions and issue a write commit to the log.
*
* => Caller must hold wl->wl_rwlock writer lock.
*/
static int
wapbl_truncate(struct wapbl *wl, size_t minfree)
{
size_t delta;
size_t avail;
off_t head;
off_t tail;
int error = 0;
/*
* First check to see if we have to do a commit
* at all.
*/
avail = wapbl_space_free(wl->wl_circ_size, wl->wl_head, wl->wl_tail);
if (minfree < avail) {
mutex_exit(&wl->wl_mtx);
return 0;
}
minfree -= avail;
while (wl->wl_error_count == 0 &&
wl->wl_reclaimable_bytes < minfree) {
WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
("wapbl_truncate: sleeping on %p"
" wl=%p bytes=%zd minfree=%zd\n",
&wl->wl_reclaimable_bytes,
wl, wl->wl_reclaimable_bytes, minfree));
cv_wait(&wl->wl_reclaimable_cv, &wl->wl_mtx);
}
if (wl->wl_reclaimable_bytes < minfree) {
KASSERT(wl->wl_error_count);
/* XXX maybe get actual error from buffer instead someday? */
error = SET_ERROR(EIO);
}
head = wl->wl_head;
tail = wl->wl_tail;
delta = wl->wl_reclaimable_bytes;
/* If all of the entries are flushed, then be sure to keep
* the reserved bytes reserved. Watch out for discarded transactions,
* which could leave more bytes reserved than are reclaimable.
*/
if (SIMPLEQ_EMPTY(&wl->wl_entries) && delta >= wl->wl_reserved_bytes) {
delta -= wl->wl_reserved_bytes;
}
wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta, &head,
&tail);
KDASSERT(wl->wl_reserved_bytes <=
wapbl_space_used(wl->wl_circ_size, head, tail));
mutex_exit(&wl->wl_mtx);
if (error)
return error;
/*
* This is where head, tail and delta are unprotected
* from races against itself or flush. This is ok since
* we only call this routine from inside flush itself.
*
* XXX: how can it race against itself when accessed only
* from behind the write-locked rwlock?
*/
error = wapbl_write_commit(wl, head, tail);
if (error)
return error;
/*
* Handle possible flushing of buffers after log has been
* decomissioned.
*/
if (!wl) {
KASSERT(we->we_bufcount > 0);
we->we_bufcount--;
#ifdef WAPBL_DEBUG_BUFBYTES
KASSERT(we->we_unsynced_bufbytes >= bufsize);
we->we_unsynced_bufbytes -= bufsize;
#endif
if (bp->b_error) {
/*
* If an error occurs, it would be nice to leave the buffer
* as a delayed write on the LRU queue so that we can retry
* it later. But buffercache(9) can't handle dirty buffer
* reuse, so just mark the log permanently errored out.
*/
mutex_enter(&wl->wl_mtx);
if (wl->wl_error_count == 0) {
wl->wl_error_count++;
cv_broadcast(&wl->wl_reclaimable_cv);
}
mutex_exit(&wl->wl_mtx);
}
/*
* Make sure that the buf doesn't retain the media flags, so that
* e.g. wapbl_allow_fuadpo has immediate effect on any following I/O.
* The flags will be set again if needed by another I/O.
*/
bp->b_flags &= ~B_MEDIA_FLAGS;
/*
* Release the buffer here. wapbl_flush() may wait for the
* log to become empty and we better unbusy the buffer before
* wapbl_flush() returns.
*/
brelse(bp, 0);
/*
* If the current transaction can be reclaimed, start
* at the beginning and reclaim any consecutive reclaimable
* transactions. If we successfully reclaim anything,
* then wakeup anyone waiting for the reclaim.
*/
if (we->we_bufcount == 0) {
size_t delta = 0;
int errcnt = 0;
#ifdef WAPBL_DEBUG_BUFBYTES
KDASSERT(we->we_unsynced_bufbytes == 0);
#endif
/*
* clear any posted error, since the buffer it came from
* has successfully flushed by now
*/
while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) &&
we->we_bufcount == 0) {
delta += we->we_reclaimable_bytes;
if (we->we_error)
errcnt++;
SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
pool_put(&wapbl_entry_pool, we);
}
/*
* wapbl_flush(wl, wait)
*
* Flush pending block writes, deallocations, and inodes from
* the current transaction in memory to the log on disk:
*
* 1. Call the file system's wl_flush callback to flush any
* per-file-system pending updates.
* 2. Wait for enough space in the log for the current transaction.
* 3. Synchronously write the new log records, advancing the
* circular queue head.
* 4. Issue the pending block writes asynchronously, now that they
* are recorded in the log and can be replayed after crash.
* 5. If wait is true, wait for all writes to complete and for the
* log to become empty.
*
* On failure, call the file system's wl_flush_abort callback.
*/
int
wapbl_flush(struct wapbl *wl, int waitfor)
{
struct buf *bp;
struct wapbl_entry *we;
off_t off;
off_t head;
off_t tail;
size_t delta = 0;
size_t flushsize;
size_t reserved;
int error = 0;
/*
* Do a quick check to see if a full flush can be skipped
* This assumes that the flush callback does not need to be called
* unless there are other outstanding bufs.
*/
if (!waitfor) {
size_t nbufs;
mutex_enter(&wl->wl_mtx); /* XXX need mutex here to
protect the KASSERTS */
nbufs = wl->wl_bufcount;
KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
mutex_exit(&wl->wl_mtx);
if (nbufs == 0)
return 0;
}
/*
* XXX we may consider using LK_UPGRADE here
* if we want to call flush from inside a transaction
*/
rw_enter(&wl->wl_rwlock, RW_WRITER);
wl->wl_flush(wl->wl_mount, TAILQ_FIRST(&wl->wl_dealloclist));
/*
* Now that we are exclusively locked and the file system has
* issued any deferred block writes for this transaction, check
* whether there are any blocks to write to the log. If not,
* skip waiting for space or writing any log entries.
*
* XXX Shouldn't this also check wl_dealloccnt and
* wl_inohashcnt? Perhaps wl_dealloccnt doesn't matter if the
* file system didn't produce any blocks as a consequence of
* it, but the same does not seem to be so of wl_inohashcnt.
*/
if (wl->wl_bufcount == 0) {
goto wait_out;
}
/* Calculate amount of space needed to flush */
flushsize = wapbl_transaction_len(wl);
if (wapbl_verbose_commit) {
struct timespec ts;
getnanotime(&ts);
printf("%s: %lld.%09ld this transaction = %zu bytes\n",
__func__, (long long)ts.tv_sec,
(long)ts.tv_nsec, flushsize);
}
if (flushsize > (wl->wl_circ_size - wl->wl_reserved_bytes)) {
/*
* XXX this could be handled more gracefully, perhaps place
* only a partial transaction in the log and allow the
* remaining to flush without the protection of the journal.
*/
panic("wapbl_flush: current transaction too big to flush");
}
error = wapbl_truncate(wl, flushsize);
if (error)
goto out;
off = wl->wl_head;
KASSERT(off == 0 || off >= wl->wl_circ_off);
KASSERT(off == 0 || off < wl->wl_circ_off + wl->wl_circ_size);
error = wapbl_write_blocks(wl, &off);
if (error)
goto out;
error = wapbl_write_revocations(wl, &off);
if (error)
goto out;
error = wapbl_write_inodes(wl, &off);
if (error)
goto out;
reserved = 0;
if (wl->wl_inohashcnt)
reserved = wapbl_transaction_inodes_len(wl);
/* Opportunistically move the tail forward if we can */
mutex_enter(&wl->wl_mtx);
delta = wl->wl_reclaimable_bytes;
mutex_exit(&wl->wl_mtx);
wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta,
&head, &tail);
error = wapbl_write_commit(wl, head, tail);
if (error)
goto out;
/*
* This flushes bufs in order than they were queued, so the LRU
* order is preserved.
*/
while ((bp = TAILQ_FIRST(&wl->wl_bufs)) != NULL) {
if (bbusy(bp, 0, 0, &wl->wl_mtx)) {
continue;
}
bp->b_iodone = wapbl_biodone;
bp->b_private = we;
/*
* If the waitfor flag is set, don't return until everything is
* fully flushed and the on disk log is empty.
*/
if (waitfor) {
error = wapbl_truncate(wl, wl->wl_circ_size -
wl->wl_reserved_bytes);
}
out:
if (error) {
wl->wl_flush_abort(wl->wl_mount,
TAILQ_FIRST(&wl->wl_dealloclist));
}
int
wapbl_register_deallocation(struct wapbl *wl, daddr_t blk, int len, bool force,
void **cookiep)
{
struct wapbl_dealloc *wd;
int error = 0;
wapbl_jlock_assert(wl);
mutex_enter(&wl->wl_mtx);
if (__predict_false(wl->wl_dealloccnt >= wl->wl_dealloclim)) {
if (!force) {
error = SET_ERROR(EAGAIN);
goto out;
}
/*
* Forced registration can only be used when:
* 1) the caller can't cope with failure
* 2) the path can be triggered only bounded, small
* times per transaction
* If this is not fullfilled, and the path would be triggered
* many times, this could overflow maximum transaction size
* and panic later.
*/
printf("%s: forced dealloc registration over limit:"
" %d >= %d\n",
wl->wl_mount->mnt_stat.f_mntonname,
wl->wl_dealloccnt, wl->wl_dealloclim);
}
/*
* wapbl_transaction_inodes_len(wl)
*
* Calculate the number of bytes required for inode registration
* log records in wl.
*/
static inline size_t
wapbl_transaction_inodes_len(struct wapbl *wl)
{
int blocklen = 1<<wl->wl_log_dev_bshift;
int iph;
/* Calculate number of inodes described in a inodelist header */
iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
/*
* wapbl_transaction_len(wl)
*
* Calculate number of bytes required for all log records in wl.
*/
static size_t
wapbl_transaction_len(struct wapbl *wl)
{
int blocklen = 1<<wl->wl_log_dev_bshift;
size_t len;
/* Calculate number of blocks described in a blocklist header */
len = wl->wl_bcount;
len += howmany(wl->wl_bufcount, wl->wl_brperjblock) * blocklen;
len += howmany(wl->wl_dealloccnt, wl->wl_brperjblock) * blocklen;
len += wapbl_transaction_inodes_len(wl);
return len;
}
/*
* wapbl_cache_sync(wl, msg)
*
* Issue DIOCCACHESYNC to wl->wl_devvp.
*
* If sysctl(vfs.wapbl.verbose_commit) >= 2, print a message
* including msg about the duration of the cache sync.
*/
static int
wapbl_cache_sync(struct wapbl *wl, const char *msg)
{
const bool verbose = wapbl_verbose_commit >= 2;
struct bintime start_time;
int force = 1;
int error;
/* Skip full cache sync if disabled */
if (!wapbl_flush_disk_cache) {
return 0;
}
if (verbose) {
bintime(&start_time);
}
error = VOP_IOCTL(wl->wl_devvp, DIOCCACHESYNC, &force,
FWRITE, FSCRED);
if (error) {
WAPBL_PRINTF(WAPBL_PRINT_ERROR,
("wapbl_cache_sync: DIOCCACHESYNC on dev 0x%jx "
"returned %d\n", (uintmax_t)wl->wl_devvp->v_rdev,
error));
}
if (verbose) {
struct bintime d;
struct timespec ts;
/*
* wapbl_write_commit(wl, head, tail)
*
* Issue a disk cache sync to wait for all pending writes to the
* log to complete, and then synchronously commit the current
* circular queue head and tail to the log, in the next of two
* locations for commit headers on disk.
*
* Increment the generation number. If the generation number
* rolls over to zero, then a subsequent commit would appear to
* have an older generation than this one -- in that case, issue a
* duplicate commit to avoid this.
*
* => Caller must have exclusive access to wl, either by holding
* wl->wl_rwlock for writer or by being wapbl_start before anyone
* else has seen wl.
*/
static int
wapbl_write_commit(struct wapbl *wl, off_t head, off_t tail)
{
struct wapbl_wc_header *wc = wl->wl_wc_header;
struct timespec ts;
int error;
daddr_t pbn;
error = wapbl_buffered_flush(wl, true);
if (error)
return error;
/*
* Flush disk cache to ensure that blocks we've written are actually
* written to the stable storage before the commit header.
* This flushes to disk not only journal blocks, but also all
* metadata blocks, written asynchronously since previous commit.
*
* XXX Calc checksum here, instead we do this for now
*/
wapbl_cache_sync(wl, "1");
/*
* Flush disk cache to ensure that the commit header is actually
* written before meta data blocks. Commit block is written using
* FUA when enabled, in that case this flush is not needed.
*/
if (!WAPBL_USE_FUA(wl))
wapbl_cache_sync(wl, "2");
/*
* If the generation number was zero, write it out a second time.
* This handles initialization and generation number rollover
*/
if (wc->wc_generation++ == 0) {
error = wapbl_write_commit(wl, head, tail);
/*
* This panic should be able to be removed if we do the
* zero'ing mentioned above, and we are certain to roll
* back generation number on failure.
*/
if (error) {
panic("wapbl_write_commit: error writing duplicate "
"log header: %d", error);
}
}
wl->wl_ev_commit.ev_count++;
return 0;
}
/*
* wapbl_write_blocks(wl, offp)
*
* Write all pending physical blocks in the current transaction
* from wapbl_add_buf to the log on disk, adding to the circular
* queue head at byte offset *offp, and returning the new head's
* byte offset in *offp.
*/
static int
wapbl_write_blocks(struct wapbl *wl, off_t *offp)
{
struct wapbl_wc_blocklist *wc =
(struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
int blocklen = 1<<wl->wl_log_dev_bshift;
struct buf *bp;
off_t off = *offp;
int error;
size_t padding;
KASSERT(rw_write_held(&wl->wl_rwlock));
bp = TAILQ_FIRST(&wl->wl_bufs);
while (bp) {
int cnt;
struct buf *obp = bp;
KASSERT(bp->b_flags & B_LOCKED);
wc->wc_type = WAPBL_WC_BLOCKS;
wc->wc_len = blocklen;
wc->wc_blkcount = 0;
wc->wc_unused = 0;
while (bp && wc->wc_blkcount < wl->wl_brperjblock) {
/*
* Make sure all the physical block numbers are up to
* date. If this is not always true on a given
* filesystem, then VOP_BMAP must be called. We
* could call VOP_BMAP here, or else in the filesystem
* specific flush callback, although neither of those
* solutions allow us to take the vnode lock. If a
* filesystem requires that we must take the vnode lock
* to call VOP_BMAP, then we can probably do it in
* bwrite when the vnode lock should already be held
* by the invoking code.
*/
KASSERT(bp->b_vp->v_type == VBLK ||
bp->b_blkno != bp->b_lblkno);
KASSERT(bp->b_blkno > 0);
/*
* wapbl_write_revocations(wl, offp)
*
* Write all pending deallocations in the current transaction from
* wapbl_register_deallocation to the log on disk, adding to the
* circular queue's head at byte offset *offp, and returning the
* new head's byte offset in *offp.
*/
static int
wapbl_write_revocations(struct wapbl *wl, off_t *offp)
{
struct wapbl_wc_blocklist *wc =
(struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
struct wapbl_dealloc *wd, *lwd;
int blocklen = 1<<wl->wl_log_dev_bshift;
off_t off = *offp;
int error;
wd = TAILQ_NEXT(wd, wd_entries);
}
WAPBL_PRINTF(WAPBL_PRINT_WRITE,
("wapbl_write_revocations: len = %u off = %"PRIdMAX"\n",
wc->wc_len, (intmax_t)off));
error = wapbl_circ_write(wl, wc, blocklen, &off);
if (error)
return error;
/* free all successfully written deallocs */
lwd = wd;
while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL) {
if (wd == lwd)
break;
wapbl_deallocation_free(wl, wd, true);
}
}
*offp = off;
return 0;
}
/*
* wapbl_write_inodes(wl, offp)
*
* Write all pending inode allocations in the current transaction
* from wapbl_register_inode to the log on disk, adding to the
* circular queue's head at byte offset *offp and returning the
* new head's byte offset in *offp.
*/
static int
wapbl_write_inodes(struct wapbl *wl, off_t *offp)
{
struct wapbl_wc_inodelist *wc =
(struct wapbl_wc_inodelist *)wl->wl_wc_scratch;
int i;
int blocklen = 1 << wl->wl_log_dev_bshift;
off_t off = *offp;
int error;
struct wapbl_ino_head *wih;
struct wapbl_ino *wi;
int iph;
/*
* wapbl_circ_read(wr, data, len, offp)
*
* Read len bytes into data from the circular queue of wr,
* starting at the linear byte offset *offp, and returning the new
* linear byte offset in *offp.
*
* If the starting linear byte offset precedes wr->wr_circ_off,
* the read instead begins at wr->wr_circ_off. XXX WTF? This
* should be a KASSERT, not a conditional.
*/
static int
wapbl_circ_read(struct wapbl_replay *wr, void *data, size_t len, off_t *offp)
{
size_t slen;
off_t off = *offp;
int error;
daddr_t pbn;
if (off < wr->wr_circ_off)
off = wr->wr_circ_off;
slen = wr->wr_circ_off + wr->wr_circ_size - off;
if (slen < len) {
pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
#ifdef _KERNEL
pbn = btodb(pbn << wr->wr_log_dev_bshift);
#endif
error = wapbl_read(data, slen, wr->wr_devvp, pbn);
if (error)
return error;
data = (uint8_t *)data + slen;
len -= slen;
off = wr->wr_circ_off;
}
pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
#ifdef _KERNEL
pbn = btodb(pbn << wr->wr_log_dev_bshift);
#endif
error = wapbl_read(data, len, wr->wr_devvp, pbn);
if (error)
return error;
off += len;
if (off >= wr->wr_circ_off + wr->wr_circ_size)
off = wr->wr_circ_off;
*offp = off;
return 0;
}
/*
* wapbl_circ_advance(wr, len, offp)
*
* Compute the linear byte offset of the circular queue of wr that
* is len bytes past *offp, and store it in *offp.
*
* This is as if wapbl_circ_read, but without actually reading
* anything.
*
* If the starting linear byte offset precedes wr->wr_circ_off, it
* is taken to be wr->wr_circ_off instead. XXX WTF? This should
* be a KASSERT, not a conditional.
*/
static void
wapbl_circ_advance(struct wapbl_replay *wr, size_t len, off_t *offp)
{
size_t slen;
off_t off = *offp;
#ifdef _KERNEL
int
wapbl_replay_isopen1(struct wapbl_replay *wr)
{
return wapbl_replay_isopen(wr);
}
#endif
/*
* calculate the disk address for the i'th block in the wc_blockblist
* offset by j blocks of size blen.
*
* wc_daddr is always a kernel disk address in DEV_BSIZE units that
* was written to the journal.
*
* The kernel needs that address plus the offset in DEV_BSIZE units.
*
* Userland needs that address plus the offset in blen units.
*
*/
static daddr_t
wapbl_block_daddr(struct wapbl_wc_blocklist *wc, int i, int j, int blen)
{
daddr_t pbn;
static void
wapbl_replay_process_blocks(struct wapbl_replay *wr, off_t *offp)
{
struct wapbl_wc_blocklist *wc =
(struct wapbl_wc_blocklist *)wr->wr_scratch;
int fsblklen = 1 << wr->wr_fs_dev_bshift;
int i, j, n;
for (i = 0; i < wc->wc_blkcount; i++) {
/*
* Enter each physical block into the hashtable independently.
*/
n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
for (j = 0; j < n; j++) {
wapbl_blkhash_ins(wr,
wapbl_block_daddr(wc, i, j, fsblklen),
*offp);
wapbl_circ_advance(wr, fsblklen, offp);
}
}
}
static void
wapbl_replay_process_revocations(struct wapbl_replay *wr)
{
struct wapbl_wc_blocklist *wc =
(struct wapbl_wc_blocklist *)wr->wr_scratch;
int fsblklen = 1 << wr->wr_fs_dev_bshift;
int i, j, n;
for (i = 0; i < wc->wc_blkcount; i++) {
/*
* Remove any blocks found from the hashtable.
*/
n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
for (j = 0; j < n; j++) {
wapbl_blkhash_rem(wr, wapbl_block_daddr(wc, i, j,
fsblklen));
}
}
}
/*
* Keep track of where we found this so location won't be
* overwritten.
*/
if (wc->wc_clear) {
wr->wr_inodestail = oldoff;
wr->wr_inodescnt = 0;
if (wr->wr_inodes != NULL) {
wapbl_free(wr->wr_inodes, oldsize);
wr->wr_inodes = NULL;
}
}
wr->wr_inodeshead = newoff;
if (wc->wc_inocnt == 0)
return;