Thanks Ying! Looking forward to your contribution.
Kostas On Wed, Jul 3, 2019 at 6:48 PM Ying Xu <y...@lyft.com.invalid> wrote: > Hi Kostas: > > For simplicity FLINK-13027 > <https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to > my > current user ID. I will contribute using that ID. > > Will circulate with the community once we have initial success with this > new rolling policy ! > > Thank you again. > > - > Ying > > > On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <y...@lyft.com> wrote: > > > Hi Kostas: > > > > I'd like to. The account used to file the JIRA does not have contributor > > access yet . I had contributed a few Flink JIRAs in the past, using a > very > > similar but different account. Now I would like to consolidate and use a > > common account for Apache projects contributions. > > > > Would you mind granting me the contributor access for the following > > account ? This way I can assign the JIRA to myself. > > *yxu-apache > > <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache > >* > > > > Many thanks! > > <http://www.lyft.com/> > > - > > Ying > > > > > > On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <kklou...@gmail.com> > wrote: > > > >> Hi Ying, > >> > >> That sounds great! > >> Looking forward to your PR! > >> > >> Btw don't you want to assign the issue to yourself if you are > >> planning to work on it? > >> > >> Kostas > >> > >> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <y...@lyft.com.invalid> wrote: > >> > >> > Thanks Kostas for confirming! > >> > > >> > I've filed a issue FLINK-13027 > >> > <https://issues.apache.org/jira/browse/FLINK-13027> . We are > actively > >> > working on the interface of such a file rolling policy, and will also > >> > perform benchmarks when it is integrated with a StreamingFileSink. We > >> are > >> > more than happy to contribute if there's no other plan to address this > >> > issue. > >> > > >> > Thanks again. > >> > > >> > - > >> > Bests > >> > Ying > >> > > >> > > >> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <kklou...@gmail.com> > >> wrote: > >> > > >> > > Hi Ying, > >> > > > >> > > You are right! If it is either on checkpoint or on size, then this > is > >> > > doable even with the current state of things. > >> > > Could you open a JIRA so that we can keep track of the progress? > >> > > > >> > > Cheers, > >> > > Kostas > >> > > > >> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <y...@lyft.com.invalid> > wrote: > >> > > > >> > > > HI Kostas: > >> > > > > >> > > > Thanks for the prompt reply. > >> > > > > >> > > > The file rolling policy mentioned previously is meant to roll > files > >> > > EITHER > >> > > > when a size limited is reached, OR when a checkpoint happens. > Looks > >> > like > >> > > > every time a file is rolled, the part file is closed > >> > > > < > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 > >> > > > >, > >> > > > during which file is closed with a committable returned > >> > > > < > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 > >> > > > >. > >> > > > I assume it is during closeForCommit() when the Parquet file > >> metatdata > >> > is > >> > > > written. At a first glance, the code path of file rolling looks > >> very > >> > > > similar to that inside prepareBucketForCheckpointing() > >> > > > < > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 > >> > > > >. > >> > > > Not sure if I miss anything there. > >> > > > > >> > > > > >> > > > - > >> > > > Ying > >> > > > > >> > > > > >> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas < > kklou...@gmail.com> > >> > > wrote: > >> > > > > >> > > > > Hi Ying, > >> > > > > > >> > > > > Thanks for using the StreamingFileSink. > >> > > > > > >> > > > > The reason why the StreamingFileSink only supports > >> > > > > OnCheckpointRollingPolicy with bulk > >> > > > > formats has to do with the fact that currently Flink relies on > the > >> > > Hadoop > >> > > > > writer for Parquet. > >> > > > > > >> > > > > Bulk formats keep important details about how they write the > >> actual > >> > > data > >> > > > > (such as compression > >> > > > > schemes, offsets, etc) in metadata and they write this metadata > >> with > >> > > the > >> > > > > file (e.g. parquet writes > >> > > > > them as a footer). The hadoop writer gives no access to these > >> > metadata. > >> > > > > Given this, there is > >> > > > > no way for flink to be able to checkpoint a part file securely > >> > without > >> > > > > closing it. > >> > > > > > >> > > > > The solution would be to write our own writer and not go through > >> the > >> > > > hadoop > >> > > > > one, but there > >> > > > > are no concrete plans for this, as far as I know. > >> > > > > > >> > > > > I hope this explains a bit more why the StreamingFileSink has > this > >> > > > > limitation. > >> > > > > > >> > > > > Cheers, > >> > > > > Kostas > >> > > > > > >> > > > > > >> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid> > >> > wrote: > >> > > > > > >> > > > > > Dear Flink community: > >> > > > > > > >> > > > > > We have a use case where StreamingFileSink > >> > > > > > < > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > >> > > > > > > > >> > > > > > is used for persisting bulk-encoded data to AWS s3. In our > case, > >> > the > >> > > > data > >> > > > > > sources consist of hybrid types of events, for which each type > >> is > >> > > > > uploaded > >> > > > > > to an individual s3 prefix location. Because the event size is > >> > highly > >> > > > > > skewed, the uploaded file size may differ dramatically. In > >> order > >> > to > >> > > > > have a > >> > > > > > better control over the uploaded file size, we would like to > >> adopt > >> > a > >> > > > > > rolling policy based on file sizes (e.g., roll the file every > >> > 100MB). > >> > > > Yet > >> > > > > > it appears bulk-encoding StreamingFileSink only supports > >> > > > checkpoint-based > >> > > > > > file rolling. > >> > > > > > > >> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > >> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part > >> file > >> > on > >> > > > > every > >> > > > > > checkpoint. > >> > > > > > > >> > > > > > Checkpoint-based file rolling appears to have other side > >> effects. > >> > For > >> > > > > > instance, quite a lot of the heavy liftings (e.g file parts > >> > > uploading) > >> > > > > are > >> > > > > > performed at the checkpointing time. As a result, > checkpointing > >> > takes > >> > > > > > longer duration when data volume is high. > >> > > > > > > >> > > > > > Having a customized file rolling policy can be achieved by > small > >> > > > > > adjustments on the BulkFormatBuilder interface in > >> > StreamingFileSink. > >> > > In > >> > > > > the > >> > > > > > case of using S3RecoverableWriter, file rolling triggers data > >> > > uploading > >> > > > > and > >> > > > > > corresponding S3Committer is also constructed and stored. > Hence > >> on > >> > > the > >> > > > > > surface, adding a simple file-size based rolling policy would > >> NOT > >> > > > > > compromise the established exact-once guarantee. > >> > > > > > > >> > > > > > Any advises on whether the above idea makes sense? Or perhaps > >> there > >> > > are > >> > > > > > pitfalls that one might pay attention when introducing such > >> rolling > >> > > > > policy. > >> > > > > > Thanks a lot! > >> > > > > > > >> > > > > > > >> > > > > > - > >> > > > > > Ying > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >