In case of joining bounded and unbounded sources with CoGroupByKey, I can guess that all data from bounded source (MySQL in this case) just comes into only one (first) window because all elements have the same timestamp (minimum timestamp or “-infinity" used for bounded sources, afaik). So, in other windows we can have only data from unbounded source. And this is expected behaviour. Even if we split bounded data into windows then it would not make too much sense since we can’t group data for every unbounded input window with all data from bounded source (which is initial goal as I can see).
In case of SideInput, created from bounded source, it should not be a problem since it will be sitting in Global window and windows from main input (unbounded) will be projected into this Global window. The only issue is available memory to keep all side input data. Please, correct me if I’m wrong with my assumptions. > On 15 Jan 2019, at 18:37, Pierre Bailly Ferry <pbai...@talend.com> wrote: > > Hello Kenneth, > Thank you so much for your answer. > > I think I'm going to implement the side input method. > However, on the Spark Runner, the MapView is a simple HashMap[1], so I will > have to put a lot of memory on my different spark executors. > > For the life cycle of MySQL data, currently I focus on fixed data. > But if I ends up with a solution where I have to call my DB once a week, this > would be fine. > > I am not familiar with the ParDo/state mechanism. I will give it a look, that > could be a solution that does not involve the MapView. > > Again, thank you! > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L322 > > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L322> > -- > Pierre Bailly-Ferry > > On 14/01/2019 21:13, Kenneth Knowles wrote: >> Hi Pierre, >> >> Joins using the join library are always per-window, so that is not likely to >> work well for you. Actually side inputs are the right architecture. What you >> really want is a map side input (View.asMap()) that does not pull the whole >> Map into memory. It is up to the runner to implement this. I don't know how >> the SparkRunner implements a Map side input, but it may not be too hard to >> implement it so that the performance is how you want it. >> >> On the other hand, there is a big question - does the MySQL database change >> ever? Countries change slow, but I would guess that you might have other use >> cases too. The bounded data is read once at the very beginning of the >> pipeline's lifecycle. If the stream lasts "forever" then you can end up >> with streaming data where the bounded data is out of date. The simplest way >> to explain it is that _changing_ data is unbounded. So then you need a >> streaming side input with some triggers to update it, and logic to find the >> "current" value for each key. >> >> One approach is to just query your MySQL database from the streaming >> ParDo(DoFn), if that gives you the consistency guarantees you are looking >> for. You then may want to use state & timers to batch your requests. >> >> Kenn >> >> On Mon, Jan 14, 2019 at 4:40 AM Pierre Bailly Ferry <pbai...@talend.com >> <mailto:pbai...@talend.com>> wrote: >> Hello everyone, >> >> I am a developer at Talend for the project Data Streams. >> >> I face some trouble using the Join[1] from an unbounded source on the Spark >> Runner. >> >> I want to achieve the following use case: >> I have a Kafka source (an unbounded source, then) and I want to enrich these >> data with data from a database (a bounded source). >> >> The Join seems the correct solution to use on my use case, because: >> 1) I preprocess data from my database, so I cannot just call my database >> over and over like suggested on the Beam Pattern [2]; >> 2) my database is big, and I want to avoid having all my database on every >> single node of my cluster, so I cannot use a simple side input. >> >> I tried a basic approach, but I reached really quickly an error in the >> GroupByKey: >> java.lang.IllegalStateException: Inputs to Flatten had incompatible window >> windowFns: >> org.apache.beam.sdk.transforms.windowing.SlidingWindows@49d96f, >> org.apache.beam.sdk.transforms.windowing.GlobalWindows >> I think the correct way to fix this trouble is to transform my GlobalWindow >> into a SlidingWindow. So, my goal is to implement the following pattern: >> >> Sadly, I am not able to create the Right Window correctly. My bounded input >> will create one window and will never repeat itself: >> >> I tried multiple things for the right window: >> 1) Using AfterWatermark: >> PCollection<KV<String, String>> lookupKV = extractKey(databaseInput) >> .apply("Sliding Window from Bounded Source", >> Window.<KV<String, String>> into( >> >> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5))) >> .triggering( >> AfterWatermark.pastEndOfWindow() >> >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >> >> .plusDelayOf(Duration.standardSeconds(5))) >> ) >> .withAllowedLateness(Duration.standardSeconds(5)) >> .accumulatingFiredPanes()); >> 2) Using Repeatedly: >> PCollection<KV<String, String>> lookupKV = extractKey(databaseInput) >> .apply("Sliding Window from Bounded Source", >> Window.<KV<String, String>> into( >> >> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5))) >> .triggering( >> Repeatedly.forever( >> AfterWatermark.pastEndOfWindow() >> >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) >> ) >> ) >> .withAllowedLateness(Duration.standardSeconds(5)) >> .accumulatingFiredPanes()); >> 3) Just the sliding window (because why not, after all): >> PCollection<KV<String, String>> lookupKV = extractKey(databaseInput) >> .apply("Sliding Window from Bounded Source", >> Window.<KV<String, String>> into( >> >> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5))) >> .accumulatingFiredPanes()); >> Nothing works. I always get one microbatch of data, then nothing more. >> >> The root of my problem is that I cannot find a trigger that is activated by >> using only the time. >> I have access to "AfterWatermark" (that require the start of a window, so a >> new record processed), "AfterProcessingTime" (that required a first element >> to start the countdown) or "AfterPane" (that require at least one new >> element, so I cannot fake it with 0 element). Nothing is fully time related. >> Repeatedly cannot be used without a trigger, so it's not the solution either. >> >> Do you see any way to achieve the Batch -> Streaming windowing that I want >> to do? >> Side question: is there a way to keep the data locality between two window, >> so I do not move data from my right input over and over? >> >> tl;dr: I am unable to enrich my Kafka input with static data from my >> database. >> >> [1] >> https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/joinlibrary/Join.html >> >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.1.0_org_apache_beam_sdk_extensions_joinlibrary_Join.html&d=DwMFaQ&c=2w5q_42kFG40MI2alLPgJw&r=pEy6gKa-aDV-eW4ogHzTp9SZu4Rfdj92R19Z8tZzAvY&m=fJV4dDJx6VEinDoI60plJ7zxRpfRdz7Mem2LW7ejQYg&s=tWAUC8hFXFmagbGpcZPG-Fj6ioExFfB_GoaLYRt_nV8&e=> >> [2] >> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2 >> >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cloud.google.com_blog_products_gcp_guide-2Dto-2Dcommon-2Dcloud-2Ddataflow-2Duse-2Dcase-2Dpatterns-2Dpart-2D2&d=DwMFaQ&c=2w5q_42kFG40MI2alLPgJw&r=pEy6gKa-aDV-eW4ogHzTp9SZu4Rfdj92R19Z8tZzAvY&m=fJV4dDJx6VEinDoI60plJ7zxRpfRdz7Mem2LW7ejQYg&s=_HWWdAvajEegBANaoxk30d_wZ3AA4SYLe9e_BsKenAE&e=> >> >> Thanks, >> -- >> Pierre Bailly-Ferry >> >> >> As a recipient of an email from Talend, your contact personal data will be >> on our systems. Please see our contacts privacy notice at Talend, Inc. >> <https://www.talend.com/contacts-privacy-policy/> >>