Hi Kumar, for late events, I have seen two approaches:
* Initial compaction every day, repeated compaction after two days, and after 1 week. * Using something like delta lake [1], which is a set of specially structured parquet files. Usually you also compact them after some time (e.g. 1 week in your case), but you can query them efficiently in the meantime. However, I'm not aware of some out-of-the-box delta lake solution for Flink. This might be something that we could put on the community agenda if there is a general interest. [1] https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/ On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar <senthi...@vmware.com> wrote: > Hello Ayush, > > > > I am interesting in knowing about your “really simple” implementation. > > > > So assuming the streaming parquet output goes to S3 bucket: Initial > (partitioned by event time) > > > > Do you write another Flink batch application (step 2) which partitions the > data from Initial in larger “event time” chunks > > and writes it out to another S3 bucket? > > > > In our case, we are getting straggling records with event times which > might be up to 1 week old. > > One approach is to simply write the batch job after 1 week, but then we > lose the ability to query the recent data in an efficient fashion. > > > > I would appreciate any ideas etc. > > > > Cheers > > Kumar > > > > *From: *Ayush Verma <ayushver...@gmail.com> > *Date: *Friday, September 11, 2020 at 8:14 AM > *To: *Robert Metzger <rmetz...@apache.org> > *Cc: *Marek Maj <marekm...@gmail.com>, user <user@flink.apache.org> > *Subject: *Re: Streaming data to parquet > > > > Hi, > > > > Looking at the problem broadly, file size is directly tied up with how > often you commit. No matter which system you use, this variable will always > be there. If you commit frequently, you will be close to realtime, but you > will have numerous small files. If you commit after long intervals, you > will have larger files, but this is as good as a "batch world". We solved > this problem at my company by having 2 systems. One to commit the files at > small intervals, thus bringing data into durable storage reliably, and one > to roll up these small files. It's actually really simple to implement this > if you don't try to do it in a single job. > > > > Best > > Ayush > > > > On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger <rmetz...@apache.org> > wrote: > > Hi Marek, > > > > what you are describing is a known problem in Flink. There are some > thoughts on how to address this in > https://issues.apache.org/jira/browse/FLINK-11499 > <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977869315&sdata=u8QY%2FedTNZcUH2%2BYDBAadHKEgN%2BpA2QBxKqywA7xbUA%3D&reserved=0> > and https://issues.apache.org/jira/browse/FLINK-17505 > <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977879306&sdata=Jy%2FR4bPXjYx1bM1XMg6QDKzu61vtn291b3MchT6O7N8%3D&reserved=0> > > Maybe some ideas there help you already for your current problem (use long > checkpoint intervals). > > > > A related idea to (2) is to write your data with the Avro format, and then > regularly use a batch job to transform your data from Avro to Parquet. > > > I hope these are some helpful pointers. I don't have a good overview over > other potential open source solutions. > > > > Best, > > Robert > > > > > > On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <marekm...@gmail.com> wrote: > > Hello Flink Community, > > > > When designing our data pipelines, we very often encounter the requirement > to stream traffic (usually from kafka) to external distributed file system > (usually HDFS or S3). This data is typically meant to be queried from > hive/presto or similar tools. Preferably data sits in columnar format like > parquet. > > Currently, using flink, it is possible to leverage StreamingFileSink to > achieve what we want to some extent. It satisfies our requirements to > partition data by event time, ensure exactly-once semantics and > fault-tolerance with checkpointing. Unfortunately, when using bulk writer > like PaquetWriter, that comes with a price of producing a big number of > files which degrades the performance of queries. > > I believe that many companies struggle with similar use cases. I know that > some of them have already approached that problem. Solutions like Alibaba > Hologres or Netflix solution with Iceberg described during FF 2019 emerged. > Given that full transition to real-time data warehouse may take a > significant amount of time and effort, I would like to primarily focus on > solutions for tools like hive/presto backed up by a distributed file > system. Usually those are the systems that we are integrating with. > > So what options do we have? Maybe I missed some existing open source tool? > > Currently, I can come up with two approaches using flink exclusively: > > 1. Cache incoming traffic in flink state until trigger fires according to > rolling strategy, probably with some late events special strategy and then > output data with StreamingFileSink. Solution is not perfect as it may > introduce additional latency and queries will still be less performant > compared to fully compacted files (late events problem). And the biggest > issue I am afraid of is actually a performance drop while releasing data > from flink state and its peak character > > 2. Focus on implementing batch rewrite job that will compact data offline. > Source for the job could be both kafka or small files produced by another > job that uses plain StreamingFileSink. The drawback is that whole system > gets more complex, additional maintenance is needed and, maybe what is more > troubling, we enter to batch world again (how could we know that no more > late data will come and we can safely run the job) > > > I would really love to hear what are community thoughts on that. > > Kind regards > Marek > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng