JOHN ERVINE created FLINK-26427:
-----------------------------------

             Summary: Streaming File Sink Uploading Smaller Versions Of The 
Same Part File To S3 (Race Condition)
                 Key: FLINK-26427
                 URL: https://issues.apache.org/jira/browse/FLINK-26427
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.13.1
            Reporter: JOHN ERVINE


I'm experiencing some odd behaviour when writing ORC files to S3 using flinks 
Streaming File Sink.

 

 
{code:java}
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"),
 CheckpointingMode.EXACTLY_ONCE);
env.getConfig().enableObjectReuse();

Properties writerProperties = new Properties();
writerProperties.put("orc.compress", "SNAPPY");

//Order Book Sink
StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
.forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
new OrcBulkWriterFactory<>(new 
OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new 
Configuration()))
.withBucketAssigner(new OrderBookBucketingAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();{code}
 

I noticed when running queries during ingest of the data, that my row counts 
were being decremented as the job progressed. I've had a look at S3 and I can 
seem multiple versions of the same part file. The example below shows part file 
15-7 has two versions. The first file is 20.7mb and the last file that's 
committed is smaller at 5.1mb. In most cases the current file is normally 
larger but in my instance there are a few examples in the screenshot below 
where this is not the case.

 

!https://i.stack.imgur.com/soU4b.png|width=2173,height=603!

 

This looks like a typical race condition or failure to upload commits to S3 
successfully because the log below shows two commits for the same file very 
close together. The last commit is at 20:44 but the last modified date in S3 is 
at 20:43. I don't see any logs indicating a failure to commit. This is 
currently a blocker for us.

 

 

{{}}
{code:java}
2022-02-28T20:44:03.526+0000 INFO  APP=${sys:AppID} COMP=${sys:CompID} 
APPNAME=${sys:AppName} S3Committer:64 - Committing 
staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
 with MPU ID 
vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--

2022-02-28T20:44:03.224+0000 INFO  APP=${sys:AppID} COMP=${sys:CompID} 
APPNAME=${sys:AppName} S3Committer:64 - Committing 
staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
 with MPU ID 
jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
{{}}

{{}}

{{ }}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to