Hi Theo, Since you are running Flink locally it would be quite easy to attach a profiler to Flink to see where most of the CPU cycles are burned (or: check if you are maybe IO bound?) .. this could provide us with valuable data on deciding for the next steps.
On Tue, May 18, 2021 at 5:26 PM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi there, > > I have the following (probably very common) usecase: I have some lookup > data ( 100 million records ) which change only slowly (in the range of some > thousands per day). My event stream is in the order of tens of billions > events per day and each event needs to be enriched from the 100 million > lookup source. For the JOIN, I don't need any event time related stuff, > just the newest version at the time of enrichment shall be taken into > account. > > As a user used to the DataStream API but unfamiliar with SQL API, I built > a small MVP. I used a connected stream and put the enrichment data into > keyed (heap)state. My RAM is enough to hold all the data in memory (once in > prod at least). I first streamed in all 100 million records, then I started > the performance measurement by streaming in just 3 million events to be > enriched against the 100 million records. I was a bit stunned that the > enrichment of all events took about 40 seconds on my local machine. I built > up a similar MVP in Spark where I put the 100 million records into a > (pre-partioned to the JOIN column) hive table, the 3 million test events > into a parquetfile and then run an outer join which also took about 40 > seconds on my local machine (consuming only 16GB of RAM). I somehow > expected Flink to be much faster as I hold the enrichment data already in > memory (state) and at least on the localhost, there is no real networking > involved. > > I then thought about the problems with the DataStream API: My 100 million > events are read from an uncompressed CSV file which is 25GB in size. > Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space. > [Actually, I run the tests in Spark with all 100million records and this > Flink test with only 20 Million records due to too much memory used, so the > 100GB is an estimation from 20 million records taking 20GB heap space]. > When I stopped parsing my enrichment data to POJOs but extracted only the > enrichment (join) attribute and kept the remaining part of the data as a > simple string, the java heap taken was only about 25GB again for all > 100million records. Not only that, my enrichment JOIN now took only 30 > seconds to complete all records. My thought now is: I probably shouldn't > use DataStream API with Java POJOs here, but Flink SQL API with "Row" > classes? I remember I once read some blog with how Flink internally > optimizes its data strucutres and can reuse certain stuff when using SQL > API and so on. > > Before I am going to try out several variants now, my question is: What do > you think is the fastest/most efficient way to enrich slowly changing data > with the latest version (Processing time temporal table JOIN) [When memory > isn't a big problem once deployed to the cluster]? Do you recommend to use > the SQL API? With which type of JOIN? (Processing time temporal table?) and > hold enrichment table fully in Flink managed memory (Can I express this via > SQL API?) or do I need to use some external "LookupTableSource"? Once I run > my application in the cluster, I suspect a "LookupTableSource" to introduce > some communication overhead vs. querying Flink State directly? If you > recommend DataStream API to be used: Should I read via SQL connectors and > work with "Rows" in state? What kind of performance tunings should I take > into account here (reuseObjects, disableChaining, ...)? > > Best regards > Theo >