Hi Shai! It looks to me like failures are caused by the fact that the Azure File System behaves semantically a slight bit different than what Flink expects and that may throw it off-guard. Here are some thoughts:
- Flink 1.1.x behaves differently than 1.2 when it comes to expectations filesystem metadata consistency. 1.2 expects a little less, as a result of some patchers to better support S3. - I tried to write up what expectations we have for filesystems. You can check if you see and points where the Azure Block Store diverges in its semantics. The expectations are in the latest master in the docs of the "FileSystem" class: https://github.com/apache/flink/blob/master/flink-core/ src/main/java/org/apache/flink/core/fs/FileSystem.java Below is a quite with relevant parts highlighted. Completely aside from that, these failures should not make the checkpoint get stuck. If you can attach a tread dump, we can look into what it is not at least "failing well". Greetings, Stephan ============= * * <h2>Data Persistence Contract</h2> * * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data, * both for results of streaming applications and for fault tolerance and recovery. It is therefore * crucial that the persistence semantics of these streams are well defined. * * <h3>Definition of Persistence Guarantees</h3> * * Data written to an output stream is considered persistent, if two requirements are met: * * <ol> * <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines, * virtual machines, containers, etc. that are able to access the file see the data consistently * when given the absolute file path. *This requirement is similar to the <i>close-to-open</i> * semantics defined by POSIX, but restricted to the file itself (by its absolute path)*.</li> * * <li><b>Durability Requirement:</b> The file system's specific durability/persistence requirements * must be met. These are specific to the particular file system. For example the * {@link LocalFileSystem} does not provide any durability guarantees for crashes of both * hardware and operating system, while replicated distributed file systems (like HDFS) * typically guarantee durability in the presence of at most <i>n</i> concurrent node failures, * where <i>n</i> is the replication factor.</li> * </ol> * * <p>*Updates to the file's parent directory (such that the file shows up when * listing the directory contents) are __not__ required to be complete* for the data in the file stream * to be considered persistent. This relaxation is important for file systems where updates to * directory contents are only eventually consistent. * * <p>*The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes * once the call to {@link FSDataOutputStream#close()} returns.* * * <h3>Examples</h3> * * <ul> * <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once * it has been received and acknowledged by the file system, typically by having been replicated * to a quorum of machines (<i>durability requirement</i>). In addition the absolute file path * must be visible to all other machines that will potentially access the file (<i>visibility * requirement</i>). * * <p>Whether data has hit non-volatile storage on the storage nodes depends on the specific * guarantees of the particular file system. * * <p>The metadata updates to the file's parent directory are not required to have reached * a consistent state. It is permissible that some machines see the file when listing the parent * directory's contents while others do not, as long as access to the file by its absolute path * is possible on all nodes.</li> * * <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics. * Because the local file system does not have any fault tolerance guarantees, no further * requirements exist. * * <p>The above implies specifically that data may still be in the OS cache when considered * persistent from the local file system's perspective. Crashes that cause the OS cache to loose * data are considered fatal to the local machine and are not covered by the local file system's * guarantees as defined by Flink. * * <p>That means that computed results, checkpoints, and savepoints that are written only to * the local filesystem are not guaranteed to be recoverable from the local machine's failure, * making local file systems unsuitable for production setups.</li> * </ul> * * <h2>Updating File Contents</h2> * * Many file systems either do not support overwriting contents of existing files at all, or do * not support consistent visibility of the updated contents in that case. For that reason, * Flink's FileSystem does not support appending to existing files, or seeking within output streams * so that previously written data could be overwritten. * * <h2>Overwriting Files</h2> * * Overwriting files is in general possible. A file is overwritten by deleting it and creating * a new file. However, certain filesystems cannot make that change synchronously visible * to all parties that have access to the file. * For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only * <i>eventual consistency</i> in the visibility of the file replacement: Some machines may see * the old file, some machines may see the new file. * * <p>To avoid these consistency issues, t*he **implementations of failure/recovery mechanisms in * Flink strictly avoid writing to the same file path more than once.* * * <h2>Thread Safety</h2> * * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem * is frequently shared across multiple threads in Flink and must be able to concurrently * create input/output streams and list file metadata. * * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly * <b>not thread-safe</b>. Instances of the streams should also not be passed between threads * in between read or write operations, because there are no guarantees about the visibility of * operations across threads (many operations do not create memory fences). On Thu, Feb 23, 2017 at 9:31 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Shai, > > I think we don't have so many users running Flink on Azure. Maybe you are > the first to put some heavy load onto that infrastructure using Flink. > I would guess that your problems are caused by the same root cause, just > the way the job is being cancelled is a bit different based on what is > happening. > For the "The specified blob does not exist". Are there config options for > the "NativeAzureFileSystem" that allow you to enable retries? > Maybe its an "eventual consistency" issue of the underlying file store. > (Flink is creating a directory against endpointA, and endpointB has not > synced yet?). > > Can you check for this checkpoint the details page: > 763 241/241 11:12:50 11:28:38 > 15m 48s > > It contains a breakdown what took so long. > Was it the alignment time? the async time for the state to be snapshotted? > > If you want, you can also contact me privately and we do a Hangout session > to look at the Flink UI together. > > Regards, > Robert > > > > > > On Thu, Feb 23, 2017 at 8:59 AM, Shai Kaplan <shai.kap...@microsoft.com> > wrote: > >> And now it's happening again >> >> -----Original Message----- >> From: Shai Kaplan [mailto:shai.kap...@microsoft.com] >> Sent: Wednesday, February 22, 2017 12:02 PM >> To: user@flink.apache.org >> Subject: RE: Flink checkpointing gets stuck >> >> I changed the checkpoint interval to 30 minutes, and also switched >> RocksDB predefined options to FLASH_SSD_OPTIMIZED, as suggested by Vinay. >> The problem hasn't exactly occurred since yesterday, but perhaps it just >> takes it longer to happen again because the checkpoints are much less >> frequent now. >> I'm now pretty sure that the " Checkpoint Coordinator is suspending" is >> not related to the other issue, because now I had a checkpoint that failed >> because of that but then the next ones succeeded. I apologize for diverging >> from the topic, but I think it's worth mentioning too, the exception was >> "Could not materialize checkpoint" caused by "Could not flush and close the >> file system output stream" caused by >> "com.microsoft.azure.storage.StorageException: >> The specified blob does not exist". What could cause that? A temporary >> failure in I/O with Azure blobs? >> >> Anyway, back to the topic, I didn't have checkpoints timing out, but I >> did have one checkpoint that took significantly longer. This is what the >> checkpoints history look like right now: >> >> ID Status Acknowledged Trigger Time Latest Acknowledgement >> End to End Duration State Size Buffered During Alignment >> 764 241/241 11:42:50 11:43:56 >> 1m 6s 3.15 GB 30.4 MB >> 763 241/241 11:12:50 11:28:38 >> 15m 48s 3.11 GB 13.4 MB >> 762 241/241 10:42:50 10:43:57 >> 1m 7s 3.08 GB 12.8 MB >> 761 241/241 10:12:50 10:13:58 >> 1m 8s 3.03 GB 5.52 MB >> 760 241/241 9:42:50 9:43:55 >> 1m 5s 2.99 GB 607 KB >> 759 241/241 9:12:50 9:13:52 >> 1m 1s 2.94 GB 0 B >> 758 241/241 8:42:50 8:43:46 >> 56s 2.90 GB 5.55 MB >> 757 241/241 8:12:50 8:14:20 >> 1m 30s 2.85 GB 0 B >> 756 121/241 7:41:49 7:42:09 >> 22s 280 MB 0 B >> 755 241/241 7:11:49 7:12:44 >> 55s 2.81 GB 30 B >> >> The status wasn't copied well, but they all succeeded except for #756, >> which failed for the reason I mentioned above. >> As you can see, checkpoint #763 took a lot longer for no apparent reason, >> so I'm guessing it's the same thing that caused the checkpoints to time out >> at 30 minutes when they were saved every 10 seconds, only now it's less >> severe because the load is much lower. >> Any thoughts on what could cause that? >> >> -----Original Message----- >> From: Ufuk Celebi [mailto:u...@apache.org] >> Sent: Tuesday, February 21, 2017 4:54 PM >> To: user@flink.apache.org >> Subject: Re: Flink checkpointing gets stuck >> >> Hey Shai! >> >> Thanks for reporting this. >> >> It's hard to tell what causes this from your email, but could you check >> the checkpoint interface >> (https://na01.safelinks.protection.outlook.com/?url=https% >> 3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs- >> release-1.3%2Fmonitoring%2Fcheckpoint_monitoring.html&data= >> 02%7C01%7CShai.Kaplan%40microsoft.com%7C1cdb8bde8ee843676a00 >> 08d45a6984b7%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C63 >> 6232856631405599&sdata=Ukf1%2Bahi00NVb3QVrTqUggYOSldheqvgMkJ >> LsCzUWVc%3D&reserved=0) >> and report how much progress the checkpoints make before timing out? >> >> The "Checkpoint Coordinator is suspending" message indicates that the job >> failed and the checkpoint coordinator is shut down because of that. Can you >> check the TaskManager and JobManager logs if other errors are reported? >> Feel free to share them. Then I could help with going over them. >> >> – Ufuk >> >> >> On Tue, Feb 21, 2017 at 2:47 PM, Shai Kaplan <shai.kap...@microsoft.com> >> wrote: >> > Hi. >> > >> > I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. >> > After some running time (minutes-hours) Flink fails to save >> > checkpoints, and stops processing records (I'm not sure if the >> > checkpointing failure is the cause of the problem or just a symptom). >> > >> > After several checkpoints that take some seconds each, they start >> > failing due to 30 minutes timeout. >> > >> > When I restart one of the Task Manager services (just to get the job >> > restarted), the job is recovered from the last successful checkpoint >> > (the state size continues to grow, so it's probably not the reason for >> > the failure), advances somewhat, saves some more checkpoints, and then >> > enters the failing state again. >> > >> > One of the times it happened, the first failed checkpoint failed due >> > to "Checkpoint Coordinator is suspending.", so it might be an >> > indicator for the cause of the problem, but looking into Flink's code >> > I can't see how a running job could get to this state. >> > >> > I am using RocksDB for state, and the state is saved to Azure Blob >> > Store, using the NativeAzureFileSystem HDFS connector over the wasbs >> protocol. >> > >> > Any ideas? Possibly a bug in Flink or RocksDB? >> > >