Hi Shai!

It looks to me like failures are caused by the fact that the Azure File
System behaves semantically a slight bit different than what Flink expects
and that may throw it off-guard.
Here are some thoughts:

  - Flink 1.1.x behaves differently than 1.2 when it comes to expectations
filesystem metadata consistency. 1.2 expects a little less, as a result of
some patchers to better support S3.

  - I tried to write up what expectations we have for filesystems. You can
check if you see and points where the Azure Block Store diverges in its
semantics.
    The expectations are in the latest master in the docs of the
"FileSystem" class: https://github.com/apache/flink/blob/master/flink-core/
src/main/java/org/apache/flink/core/fs/FileSystem.java
    Below is a quite with relevant parts highlighted.


Completely aside from that, these failures should not make the checkpoint
get stuck. If you can attach a tread dump, we can look into what it is not
at least "failing well".

Greetings,
Stephan

=============

 *
 * <h2>Data Persistence Contract</h2>
 *
 * The FileSystem's {@link FSDataOutputStream output streams} are used
to persistently store data,
 * both for results of streaming applications and for fault tolerance
and recovery. It is therefore
 * crucial that the persistence semantics of these streams are well defined.
 *
 * <h3>Definition of Persistence Guarantees</h3>
 *
 * Data written to an output stream is considered persistent, if two
requirements are met:
 *
 * <ol>
 *     <li><b>Visibility Requirement:</b> It must be guaranteed that
all other processes, machines,
 *     virtual machines, containers, etc. that are able to access the
file see the data consistently
 *     when given the absolute file path. *This requirement is similar
to the <i>close-to-open</i>
 *     semantics defined by POSIX, but restricted to the file itself
(by its absolute path)*.</li>
 *
 *     <li><b>Durability Requirement:</b> The file system's specific
durability/persistence requirements
 *     must be met. These are specific to the particular file system.
For example the
 *     {@link LocalFileSystem} does not provide any durability
guarantees for crashes of both
 *     hardware and operating system, while replicated distributed
file systems (like HDFS)
 *     typically guarantee durability in the presence of at most
<i>n</i> concurrent node failures,
 *     where <i>n</i> is the replication factor.</li>
 * </ol>
 *
 * <p>*Updates to the file's parent directory (such that the file shows up when
 * listing the directory contents) are __not__ required to be
complete* for the data in the file stream
 * to be considered persistent. This relaxation is important for file
systems where updates to
 * directory contents are only eventually consistent.
 *
 * <p>*The {@link FSDataOutputStream} has to guarantee data
persistence for the written bytes
 * once the call to {@link FSDataOutputStream#close()} returns.*
 *
 * <h3>Examples</h3>
 *
 * <ul>
 *     <li>For <b>fault-tolerant distributed file systems</b>, data is
considered persistent once
 *     it has been received and acknowledged by the file system,
typically by having been replicated
 *     to a quorum of machines (<i>durability requirement</i>). In
addition the absolute file path
 *     must be visible to all other machines that will potentially
access the file (<i>visibility
 *     requirement</i>).
 *
 *     <p>Whether data has hit non-volatile storage on the storage
nodes depends on the specific
 *     guarantees of the particular file system.
 *
 *     <p>The metadata updates to the file's parent directory are not
required to have reached
 *     a consistent state. It is permissible that some machines see
the file when listing the parent
 *     directory's contents while others do not, as long as access to
the file by its absolute path
 *     is possible on all nodes.</li>
 *
 *     <li>A <b>local file system</b> must support the POSIX
<i>close-to-open</i> semantics.
 *     Because the local file system does not have any fault tolerance
guarantees, no further
 *     requirements exist.
 *
 *     <p>The above implies specifically that data may still be in the
OS cache when considered
 *     persistent from the local file system's perspective. Crashes
that cause the OS cache to loose
 *     data are considered fatal to the local machine and are not
covered by the local file system's
 *     guarantees as defined by Flink.
 *
 *     <p>That means that computed results, checkpoints, and
savepoints that are written only to
 *     the local filesystem are not guaranteed to be recoverable from
the local machine's failure,
 *     making local file systems unsuitable for production setups.</li>
 * </ul>
 *
 * <h2>Updating File Contents</h2>
 *
 * Many file systems either do not support overwriting contents of
existing files at all, or do
 * not support consistent visibility of the updated contents in that
case. For that reason,
 * Flink's FileSystem does not support appending to existing files, or
seeking within output streams
 * so that previously written data could be overwritten.
 *
 * <h2>Overwriting Files</h2>
 *
 * Overwriting files is in general possible. A file is overwritten by
deleting it and creating
 * a new file. However, certain filesystems cannot make that change
synchronously visible
 * to all parties that have access to the file.
 * For example <a
href="https://aws.amazon.com/documentation/s3/";>Amazon S3</a>
guarantees only
 * <i>eventual consistency</i> in the visibility of the file
replacement: Some machines may see
 * the old file, some machines may see the new file.
 *
 * <p>To avoid these consistency issues, t*he **implementations of
failure/recovery mechanisms in
 * Flink strictly avoid writing to the same file path more than once.*
 *
 * <h2>Thread Safety</h2>
 *
 * Implementations of {@code FileSystem} must be thread-safe: The same
instance of FileSystem
 * is frequently shared across multiple threads in Flink and must be
able to concurrently
 * create input/output streams and list file metadata.
 *
 * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream}
implementations are strictly
 * <b>not thread-safe</b>. Instances of the streams should also not be
passed between threads
 * in between read or write operations, because there are no
guarantees about the visibility of
 * operations across threads (many operations do not create memory fences).




On Thu, Feb 23, 2017 at 9:31 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Shai,
>
> I think we don't have so many users running Flink on Azure. Maybe you are
> the first to put some heavy load onto that infrastructure using Flink.
> I would guess that your problems are caused by the same root cause, just
> the way the job is being cancelled is a bit different based on what is
> happening.
> For the "The specified blob does not exist". Are there config options for
> the "NativeAzureFileSystem" that allow you to enable retries?
> Maybe its an "eventual consistency" issue of the underlying file store.
> (Flink is creating a directory against endpointA, and endpointB has not
> synced yet?).
>
> Can you check for this checkpoint the details page:
> 763             241/241         11:12:50        11:28:38
>       15m 48s
>
> It contains a breakdown what took so long.
> Was it the alignment time? the async time for the state to be snapshotted?
>
> If you want, you can also contact me privately and we do a Hangout session
> to look at the Flink UI together.
>
> Regards,
> Robert
>
>
>
>
>
> On Thu, Feb 23, 2017 at 8:59 AM, Shai Kaplan <shai.kap...@microsoft.com>
> wrote:
>
>> And now it's happening again
>>
>> -----Original Message-----
>> From: Shai Kaplan [mailto:shai.kap...@microsoft.com]
>> Sent: Wednesday, February 22, 2017 12:02 PM
>> To: user@flink.apache.org
>> Subject: RE: Flink checkpointing gets stuck
>>
>> I changed the checkpoint interval to 30 minutes, and also switched
>> RocksDB predefined options to FLASH_SSD_OPTIMIZED, as suggested by Vinay.
>> The problem hasn't exactly occurred since yesterday, but perhaps it just
>> takes it longer to happen again because the checkpoints are much less
>> frequent now.
>> I'm now pretty sure that the " Checkpoint Coordinator is suspending" is
>> not related to the other issue, because now I had a checkpoint that failed
>> because of that but then the next ones succeeded. I apologize for diverging
>> from the topic, but I think it's worth mentioning too, the exception was
>> "Could not materialize checkpoint" caused by "Could not flush and close the
>> file system output stream" caused by 
>> "com.microsoft.azure.storage.StorageException:
>> The specified blob does not exist". What could cause that? A temporary
>> failure in I/O with Azure blobs?
>>
>> Anyway, back to the topic, I didn't have checkpoints timing out, but I
>> did have one checkpoint that took significantly longer. This is what the
>> checkpoints history look like right now:
>>
>> ID      Status  Acknowledged    Trigger Time    Latest Acknowledgement
>> End to End Duration     State Size      Buffered During Alignment
>> 764             241/241         11:42:50        11:43:56
>>       1m 6s                   3.15 GB         30.4 MB
>> 763             241/241         11:12:50        11:28:38
>>       15m 48s                 3.11 GB         13.4 MB
>> 762             241/241         10:42:50        10:43:57
>>       1m 7s                   3.08 GB         12.8 MB
>> 761             241/241         10:12:50        10:13:58
>>       1m 8s                   3.03 GB         5.52 MB
>> 760             241/241         9:42:50         9:43:55
>>        1m 5s                   2.99 GB         607 KB
>> 759             241/241         9:12:50         9:13:52
>>        1m 1s                   2.94 GB         0 B
>> 758             241/241         8:42:50         8:43:46
>>        56s                     2.90 GB         5.55 MB
>> 757             241/241         8:12:50         8:14:20
>>        1m 30s                  2.85 GB         0 B
>> 756             121/241         7:41:49         7:42:09
>>        22s                     280 MB  0 B
>> 755             241/241         7:11:49         7:12:44
>>        55s                     2.81 GB         30 B
>>
>> The status wasn't copied well, but they all succeeded except for #756,
>> which failed for the reason I mentioned above.
>> As you can see, checkpoint #763 took a lot longer for no apparent reason,
>> so I'm guessing it's the same thing that caused the checkpoints to time out
>> at 30 minutes when they were saved every 10 seconds, only now it's less
>> severe because the load is much lower.
>> Any thoughts on what could cause that?
>>
>> -----Original Message-----
>> From: Ufuk Celebi [mailto:u...@apache.org]
>> Sent: Tuesday, February 21, 2017 4:54 PM
>> To: user@flink.apache.org
>> Subject: Re: Flink checkpointing gets stuck
>>
>> Hey Shai!
>>
>> Thanks for reporting this.
>>
>> It's hard to tell what causes this from your email, but could you check
>> the checkpoint interface
>> (https://na01.safelinks.protection.outlook.com/?url=https%
>> 3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-
>> release-1.3%2Fmonitoring%2Fcheckpoint_monitoring.html&data=
>> 02%7C01%7CShai.Kaplan%40microsoft.com%7C1cdb8bde8ee843676a00
>> 08d45a6984b7%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C63
>> 6232856631405599&sdata=Ukf1%2Bahi00NVb3QVrTqUggYOSldheqvgMkJ
>> LsCzUWVc%3D&reserved=0)
>> and report how much progress the checkpoints make before timing out?
>>
>> The "Checkpoint Coordinator is suspending" message indicates that the job
>> failed and the checkpoint coordinator is shut down because of that. Can you
>> check the TaskManager and JobManager logs if other errors are reported?
>> Feel free to share them. Then I could help with going over them.
>>
>> – Ufuk
>>
>>
>> On Tue, Feb 21, 2017 at 2:47 PM, Shai Kaplan <shai.kap...@microsoft.com>
>> wrote:
>> > Hi.
>> >
>> > I'm running a Flink 1.2 job with a 10 seconds checkpoint interval.
>> > After some running time (minutes-hours) Flink fails to save
>> > checkpoints, and stops processing records (I'm not sure if the
>> > checkpointing failure is the cause of the problem or just a symptom).
>> >
>> > After several checkpoints that take some seconds each, they start
>> > failing due to 30 minutes timeout.
>> >
>> > When I restart one of the Task Manager services (just to get the job
>> > restarted), the job is recovered from the last successful checkpoint
>> > (the state size continues to grow, so it's probably not the reason for
>> > the failure), advances somewhat, saves some more checkpoints, and then
>> > enters the failing state again.
>> >
>> > One of the times it happened, the first failed checkpoint failed due
>> > to "Checkpoint Coordinator is suspending.", so it might be an
>> > indicator for the cause of the problem, but looking into Flink's code
>> > I can't see how a running job could get to this state.
>> >
>> > I am using RocksDB for state, and the state is saved to Azure Blob
>> > Store, using the NativeAzureFileSystem HDFS connector over the wasbs
>> protocol.
>> >
>> > Any ideas? Possibly a bug in Flink or RocksDB?
>>
>
>

Reply via email to