Hi Theo,

Since you are running Flink locally it would be quite easy to attach a
profiler to Flink to see where most of the CPU cycles are burned (or: check
if you are maybe IO bound?) .. this could provide us with valuable data on
deciding for the next steps.

On Tue, May 18, 2021 at 5:26 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> I have the following (probably very common) usecase: I have some lookup
> data ( 100 million records ) which change only slowly (in the range of some
> thousands per day). My event stream is in the order of tens of billions
> events per day and each event needs to be enriched from the 100 million
> lookup source. For the JOIN, I don't need any event time related stuff,
> just the newest version at the time of enrichment shall be taken into
> account.
>
> As a user used to the DataStream API but unfamiliar with SQL API, I built
> a small MVP. I used a connected stream and put the enrichment data into
> keyed (heap)state. My RAM is enough to hold all the data in memory (once in
> prod at least). I first streamed in all 100 million records, then I started
> the performance measurement by streaming in just 3 million events to be
> enriched against the 100 million records. I was a bit stunned that the
> enrichment of all events took about 40 seconds on my local machine. I built
> up a similar MVP in Spark where I put the 100 million records into a
> (pre-partioned to the JOIN column) hive table, the 3 million test events
> into a parquetfile and then run an outer join which also took about 40
> seconds on my local machine (consuming only 16GB of RAM). I somehow
> expected Flink to be much faster as I hold the enrichment data already in
> memory (state) and at least on the localhost, there is no real networking
> involved.
>
> I then thought about the problems with the DataStream API: My 100 million
> events are read from an uncompressed CSV file which is 25GB in size.
> Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space.
> [Actually, I run the tests in Spark with all 100million records and this
> Flink test with only 20 Million records due to too much memory used, so the
> 100GB is an estimation from 20 million records taking 20GB heap space].
> When I stopped parsing my enrichment data to POJOs but extracted only the
> enrichment (join) attribute and kept the remaining part of the data as a
> simple string, the java heap taken was only about 25GB again for all
> 100million records. Not only that, my enrichment JOIN now took only 30
> seconds to complete all records. My thought now is: I probably shouldn't
> use DataStream API with Java POJOs here, but Flink SQL API with "Row"
> classes? I remember I once read some blog with how Flink internally
> optimizes its data strucutres and can reuse certain stuff when using SQL
> API and so on.
>
> Before I am going to try out several variants now, my question is: What do
> you think is the fastest/most efficient way to enrich slowly changing data
> with the latest version (Processing time temporal table JOIN) [When memory
> isn't a big problem once deployed to the cluster]? Do you recommend to use
> the SQL API? With which type of JOIN? (Processing time temporal table?) and
> hold enrichment table fully in Flink managed memory (Can I express this via
> SQL API?) or do I need to use some external "LookupTableSource"? Once I run
> my application in the cluster, I suspect a "LookupTableSource" to introduce
> some communication overhead vs. querying Flink State directly? If you
> recommend DataStream API to be used: Should I read via SQL connectors and
> work with "Rows" in state? What kind of performance tunings should I take
> into account here (reuseObjects, disableChaining, ...)?
>
> Best regards
> Theo
>

Reply via email to