Hi,
I'd like to mention another approach, which might not be as "flinkish",
but removes the source of issues which arise when writing bulk files.
The actual cause of issues here is that when creating bulk output, the
most efficient option is to have _reversed flow of commit_. That is to
say - on contrary of Flink's checkpoint barrier flowing from sources to
sinks - the optimal performance in bulk case is to let the sink commit
source once it finishes the bulk write (with whatever period). This is
currently impossible to achieve with Flink, but what works for me the
best is to use Flink sinks to write streaming commit log (e.g. Kafka)
and then have independent processes (Kafka consumers or equivalent) to
read output topics, pack them and push to bulk store, once the write is
finished, the Kafka topic is committed. It requires deployment of
additional application, but that is low overhead in deployments like k8s.
Moreover, this solves the dilemma between quick commits (for real-time
data) and large files, because one can read data from both streaming
(real real-time) source and do a union with batch data stored at bulk
store. Both these techniques are implemented in [1] (disclaimer: I'm one
of the core developers of that platform).
Jan
[1] https://github.com/O2-Czech-Republic/proxima-platform
On 9/14/20 2:03 PM, Arvid Heise wrote:
Hi Kumar,
for late events, I have seen two approaches:
* Initial compaction every day, repeated compaction after two days,
and after 1 week.
* Using something like delta lake [1], which is a set of specially
structured parquet files. Usually you also compact them after some
time (e.g. 1 week in your case), but you can query them efficiently in
the meantime.
However, I'm not aware of some out-of-the-box delta lake solution for
Flink. This might be something that we could put on the community
agenda if there is a general interest.
[1]
https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/
On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar <senthi...@vmware.com
<mailto:senthi...@vmware.com>> wrote:
Hello Ayush,
I am interesting in knowing about your “really simple” implementation.
So assuming the streaming parquet output goes to S3 bucket:
Initial (partitioned by event time)
Do you write another Flink batch application (step 2) which
partitions the data from Initial in larger “event time” chunks
and writes it out to another S3 bucket?
In our case, we are getting straggling records with event times
which might be up to 1 week old.
One approach is to simply write the batch job after 1 week, but
then we lose the ability to query the recent data in an efficient
fashion.
I would appreciate any ideas etc.
Cheers
Kumar
*From: *Ayush Verma <ayushver...@gmail.com
<mailto:ayushver...@gmail.com>>
*Date: *Friday, September 11, 2020 at 8:14 AM
*To: *Robert Metzger <rmetz...@apache.org
<mailto:rmetz...@apache.org>>
*Cc: *Marek Maj <marekm...@gmail.com
<mailto:marekm...@gmail.com>>, user <user@flink.apache.org
<mailto:user@flink.apache.org>>
*Subject: *Re: Streaming data to parquet
Hi,
Looking at the problem broadly, file size is directly tied up with
how often you commit. No matter which system you use, this
variable will always be there. If you commit frequently, you will
be close to realtime, but you will have numerous small files. If
you commit after long intervals, you will have larger files, but
this is as good as a "batch world". We solved this problem at my
company by having 2 systems. One to commit the files at small
intervals, thus bringing data into durable storage reliably, and
one to roll up these small files. It's actually really simple to
implement this if you don't try to do it in a single job.
Best
Ayush
On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger
<rmetz...@apache.org <mailto:rmetz...@apache.org>> wrote:
Hi Marek,
what you are describing is a known problem in Flink. There are
some thoughts on how to address this in
https://issues.apache.org/jira/browse/FLINK-11499
<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977869315&sdata=u8QY%2FedTNZcUH2%2BYDBAadHKEgN%2BpA2QBxKqywA7xbUA%3D&reserved=0>
and https://issues.apache.org/jira/browse/FLINK-17505
<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977879306&sdata=Jy%2FR4bPXjYx1bM1XMg6QDKzu61vtn291b3MchT6O7N8%3D&reserved=0>
Maybe some ideas there help you already for your current
problem (use long checkpoint intervals).
A related idea to (2) is to write your data with the Avro
format, and then regularly use a batch job to transform your
data from Avro to Parquet.
I hope these are some helpful pointers. I don't have a good
overview over other potential open source solutions.
Best,
Robert
On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <marekm...@gmail.com
<mailto:marekm...@gmail.com>> wrote:
Hello Flink Community,
When designing our data pipelines, we very often encounter
the requirement to stream traffic (usually from kafka) to
external distributed file system (usually HDFS or S3).
This data is typically meant to be queried from
hive/presto or similar tools. Preferably data sits in
columnar format like parquet.
Currently, using flink, it is possible to leverage
StreamingFileSink to achieve what we want to some extent.
It satisfies our requirements to partition data by event
time, ensure exactly-once semantics and fault-tolerance
with checkpointing. Unfortunately, when using bulk writer
like PaquetWriter, that comes with a price of producing a
big number of files which degrades the performance of
queries.
I believe that many companies struggle with similar use
cases. I know that some of them have already approached
that problem. Solutions like Alibaba Hologres or Netflix
solution with Iceberg described during FF 2019 emerged.
Given that full transition to real-time data warehouse may
take a significant amount of time and effort, I would like
to primarily focus on solutions for tools like hive/presto
backed up by a distributed file system. Usually those are
the systems that we are integrating with.
So what options do we have? Maybe I missed some existing
open source tool?
Currently, I can come up with two approaches using flink
exclusively:
1. Cache incoming traffic in flink state until trigger
fires according to rolling strategy, probably with some
late events special strategy and then output data with
StreamingFileSink. Solution is not perfect as it may
introduce additional latency and queries will still be
less performant compared to fully compacted files (late
events problem). And the biggest issue I am afraid of is
actually a performance drop while releasing data from
flink state and its peak character
2. Focus on implementing batch rewrite job that will
compact data offline. Source for the job could be both
kafka or small files produced by another job that uses
plain StreamingFileSink. The drawback is that whole system
gets more complex, additional maintenance is needed and,
maybe what is more troubling, we enter to batch world
again (how could we know that no more late data will come
and we can safely run the job)
I would really love to hear what are community thoughts on
that.
Kind regards
Marek
--
Arvid Heise| Senior Java Developer
<https://www.ververica.com/>
Follow us @VervericaData
--
Join Flink Forward <https://flink-forward.org/>- The Apache
FlinkConference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
Ji (Toni) Cheng