[ https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436409#comment-16436409 ]
Matthias J. Sax commented on KAFKA-6767: ---------------------------------------- The corresponding task directory is created, when the task is created. Thus, I am not sure atm if I understand the scenario completely. When you crash with ephemeral disks and restart, you lose the whole state and RocksDB should be recreated -- as a precondition, the state directory would be recreated when the task is recreated. The error shows that writing the checkpoint file during commit() fails, thus, if the task has an RocksDB store, the directly must exist -- ie, it seems that the task does not have a store attached? Thus, I am wondering how we could get into this state? Could it be that KAFKA-6499 was not fixed properly and this checkpoint file does not belong to a stateful task? Otherwise, all writes to RocksDB should fail, too, and the whole application should fail (or does it fail? because you report only this WARN level log, and say it repeats, it seems that the application keeps running). A workaround for now would be, to create the missing directory manually -- but we should really understand the overall scenario to fix it properly. I am juts not sure why the directory might have been deleted or how to reproduce the issue atm. Or did I miss anything? > OffsetCheckpoint write assumes parent directory exists > ------------------------------------------------------ > > Key: KAFKA-6767 > URL: https://issues.apache.org/jira/browse/KAFKA-6767 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Steven Schlansker > Priority: Minor > > We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an > instance dies it is created from scratch, rather than reusing the existing > RocksDB.) > We routinely see: > {code:java} > 2018-04-09T19:14:35.004Z WARN <> > [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset > checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} > java.io.FileNotFoundException: > /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.<init>(FileOutputStream.java:213) > at java.io.FileOutputStream.<init>(FileOutputStream.java:162) > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > Inspecting the state store directory, I can indeed see that {{chat/0_11}} > does not exist (although many other partitions do). > > Looking at the OffsetCheckpoint write method, it seems to try to open a new > checkpoint file without first ensuring that the parent directory exists. > > {code:java} > public void write(final Map<TopicPartition, Long> offsets) throws > IOException { > // if there is no offsets, skip writing the file to save disk IOs > if (offsets.isEmpty()) { > return; > } > synchronized (lock) { > // write to temp file and then swap with the existing file > final File temp = new File(file.getAbsolutePath() + ".tmp");{code} > > Either the OffsetCheckpoint class should initialize the directories if > needed, or some precondition of it being called should ensure that is the > case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)