HI Kostas: Thanks for the prompt reply.
The file rolling policy mentioned previously is meant to roll files EITHER when a size limited is reached, OR when a checkpoint happens. Looks like every time a file is rolled, the part file is closed <https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218>, during which file is closed with a committable returned <https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240>. I assume it is during closeForCommit() when the Parquet file metatdata is written. At a first glance, the code path of file rolling looks very similar to that inside prepareBucketForCheckpointing() <https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275>. Not sure if I miss anything there. - Ying On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Ying, > > Thanks for using the StreamingFileSink. > > The reason why the StreamingFileSink only supports > OnCheckpointRollingPolicy with bulk > formats has to do with the fact that currently Flink relies on the Hadoop > writer for Parquet. > > Bulk formats keep important details about how they write the actual data > (such as compression > schemes, offsets, etc) in metadata and they write this metadata with the > file (e.g. parquet writes > them as a footer). The hadoop writer gives no access to these metadata. > Given this, there is > no way for flink to be able to checkpoint a part file securely without > closing it. > > The solution would be to write our own writer and not go through the hadoop > one, but there > are no concrete plans for this, as far as I know. > > I hope this explains a bit more why the StreamingFileSink has this > limitation. > > Cheers, > Kostas > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid> wrote: > > > Dear Flink community: > > > > We have a use case where StreamingFileSink > > < > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, the data > > sources consist of hybrid types of events, for which each type is > uploaded > > to an individual s3 prefix location. Because the event size is highly > > skewed, the uploaded file size may differ dramatically. In order to > have a > > better control over the uploaded file size, we would like to adopt a > > rolling policy based on file sizes (e.g., roll the file every 100MB). Yet > > it appears bulk-encoding StreamingFileSink only supports checkpoint-based > > file rolling. > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on > every > > checkpoint. > > > > Checkpoint-based file rolling appears to have other side effects. For > > instance, quite a lot of the heavy liftings (e.g file parts uploading) > are > > performed at the checkpointing time. As a result, checkpointing takes > > longer duration when data volume is high. > > > > Having a customized file rolling policy can be achieved by small > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In > the > > case of using S3RecoverableWriter, file rolling triggers data uploading > and > > corresponding S3Committer is also constructed and stored. Hence on the > > surface, adding a simple file-size based rolling policy would NOT > > compromise the established exact-once guarantee. > > > > Any advises on whether the above idea makes sense? Or perhaps there are > > pitfalls that one might pay attention when introducing such rolling > policy. > > Thanks a lot! > > > > > > - > > Ying > > >