Hey,

SyncThread.checkpoint(Checkpoint checkpoint) (which is called periodically
by SyncThread's executor for every flushInterval) ultimately calls
EntryLogger.flushRotatedLogs.

In EntryLogger.flushRotatedLogs, first we set 'logChannelsToFlush' to null
and then we try to flush and close individual file. Now, if IOException
happens while trying to flush/close the logchannel, then exception is
thrown as it is and it get propagates back upto SyncThread.checkpoint. Here
we catch that IOException, log it and return without calling the
checkpointComplete. But by now we lost reference of 'logChannelsToFlush'
(rolled logs which are yet to be closed), because it is set to null before
we try to flush/close individually rolledlogs. The next execution of
'checkpoint' (after flushinterval) wouldn't be knowing about the rolledlogs
it failed to flush/close the previous time and it would flush the newly
rolledlogs. So the failure of flush/close of the previous rolledlogs goes
unnoticed completely.

in EntryLogger.java
        void flushRotatedLogs() throws IOException {
        List<BufferedLogChannel> channels = null;
        long flushedLogId = INVALID_LID;
        synchronized (this) {
            channels = logChannelsToFlush;
            logChannelsToFlush = null;               <--------- here we set
'logChannelsToFlush' to null before it tries to flush/close rolledlogs
        }
        if (null == channels) {
            return;
        }
        for (BufferedLogChannel channel : channels) {
            channel.flush(true);
 <------------IOEXception can happen here or in the following
closeFileChannel call
            // since this channel is only used for writing, after flushing
the channel,
            // we had to close the underlying file channel. Otherwise, we
might end up
            // leaking fds which cause the disk spaces could not be
reclaimed.
            closeFileChannel(channel);
            if (channel.getLogId() > flushedLogId) {
                flushedLogId = channel.getLogId();
            }
            LOG.info("Synced entry logger {} to disk.", channel.getLogId());
        }
        // move the leastUnflushedLogId ptr
        leastUnflushedLogId = flushedLogId + 1;
    }

in SyncThread.java
    public void checkpoint(Checkpoint checkpoint) {
        try {
            checkpoint = ledgerStorage.checkpoint(checkpoint);
        } catch (NoWritableLedgerDirException e) {
            LOG.error("No writeable ledger directories", e);
            dirsListener.allDisksFull();
            return;
        } catch (IOException e) {
            LOG.error("Exception flushing ledgers", e); <-----that IOExc
gets propagated to this method and here it is caught and not dealt
appropriately
            return;
        }

        try {
            checkpointSource.checkpointComplete(checkpoint, true);
        } catch (IOException e) {
            LOG.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull();
        }
    }

Thanks,
Charan

Reply via email to