Great, looking forward to hearing from you again. Best, Kurt
On Mon, Dec 16, 2019 at 10:22 PM Krzysztof Zarzycki <k.zarzy...@gmail.com> wrote: > Thanks Kurt for your answers. > > Summing up, I feel like the option 1 (i.e. join with temporal table > function) requires some coding around a source, that needs to pull data > once a day. But otherwise, bring the following benefits: > * I don't have to put dicts in another store like Hbase. All stays in > Hive + Flink. > * I'll be able to make a true temporal join - event-time based. > * I believe I will be able to build a history reprocessing program based > on the same logic (i.e. same SQL). At least for a particular day - > processing multiple days would be tricky, because I will need to pull > multiple versions of the dictionary. > Plus, looking up dict values will be much faster and resource optimal when > dict is stored in a state instead of uncached Hbase. It's especially > important in a case when we want to reprocess historical, archived stream > with a speed of millions of events/sec. > > I understand that option 2 is easier to implement. I may do a PoC of it as > well. > OK, I believe I know enough to get my hands dirty with the code. I can > share later on what I was able to accomplish. And probably more questions > will show up when I finally start the implementation. > > Thanks > Krzysztof > > pon., 16 gru 2019 o 03:14 Kurt Young <ykt...@gmail.com> napisał(a): > >> Hi Krzysztof, thanks for the discussion, you raised lots of good >> questions, I will try to reply them >> one by one. >> >> Re option 1: >> >> > Question 1: do I need to write that Hive source or can I use something >> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource >> class? >> >> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently >> `HiveTableSource` works >> as batch mode, it will read all data at once and stop. But what you need >> is wait until next day after >> finish. What you can try is reuse the logic of `HiveTableInputFormat`, >> and wrap the "monitoring" >> logic outside. >> >> > Question/worry 2: the state would grow inifinitely if I had infinite >> number of keys, but not only infinite number of versions of all keys. >> >> The temporal table function doesn't support watermark based state clean >> up yet, but what you can >> try is idle state retention [1]. So even if you have infinite number of >> keys, for example say you have >> different join keys every day, the old keys will not be touched in next >> days and become idle and will >> be deleted by framework. >> >> > Question 3: Do you imagine that I could use the same logic for both >> stream processing and reprocessing just by replacing sources and sinks? >> >> Generally speaking, yes I think so. With event time based join, we should >> be able to reuse the logic >> of normal stream processing and reprocessing historical data. Although >> there will definitely exists some >> details should be addressed, like event time and watermarks. >> >> Re option 2: >> >> > maybe implement Hive/JDBC-based LookupableTableSource that pulls the >> whole dictionary to memory >> >> You can do this manually but I would recommend you go with the first >> choice which loads hive table >> to HBase periodically. It's much more easier and efficient. And this >> approach you mentioned also >> seems a little bit duplicate with the temporal table function solution. >> >> > this option is available only with Blink engine and also only with use >> of Flink SQL, no Table API? >> >> I'm afraid yes, you can only use it with SQL for now. >> >> > do you think it would be possible to use the same logic / SQL for >> reprocessing? >> >> Given the fact this solution is based on processing time, I don't think >> it can cover the use case of >> reprocessing, except if you can accept always joining with latest day >> even during backfilling. But we >> are also aiming to resolve this shortcoming maybe in 1 or 2 releases. >> >> Best, >> Kurt >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time >> >> >> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <k.zarzy...@gmail.com> >> wrote: >> >>> Very interesting, Kurt! Yes, I also imagined it's rather a very common >>> case. In my company we currently have 3 clients wanting this functionality. >>> I also just realized this slight difference between Temporal Join and >>> Temporal Table Function Join, that there are actually two methods:) >>> >>> Regarding option 1: >>> So I would need to: >>> * write a Datastream API source, that pulls Hive dictionary table every >>> let's say day, assigns event time column to rows and creates a stream of >>> it. It does that and only that. >>> * create a table (from Table API) out of it, assigning one of the >>> columns as an event time column. >>> * then use table.createTemporalTableFunction(<all columns, including >>> time column>) >>> * finally join my main data stream with the temporal table function (let >>> me use short name TTF from now) from my dictionary, using Flink SQL and >>> LATERAL >>> TABLE (Rates(o.rowtime)) AS r construct. >>> And so I should achieve my temporal event-time based join with versioned >>> dictionaries! >>> Question 1: do I need to write that Hive source or can I use something >>> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource >>> class? >>> >>> Question/worry 2: One thing that worried me is this comment in the docs: >>> >>> *Note: State retention defined in a query configuration >>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html> >>> is >>> not yet implemented for temporal joins. This means that the required state >>> to compute the query result might grow infinitely depending on the number >>> of distinct primary keys for the history table. * >>> >>> On the other side, I find this comment: *By definition of event >>> time, watermarks >>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html> >>> allow >>> the join operation to move forward in time and discard versions of the >>> build table that are no longer necessary because no incoming row with lower >>> or equal timestamp is expected.* >>> So I believe that the state would grow inifinitely if I had infinite >>> number of keys, but not only infinite number of versions of all keys. Which >>> is fine. Do you confirm? >>> >>> Question 3: I need to be able to cover also reprocessing or backfilling >>> of historical data. Let's say I would need to join data stream and >>> (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I >>> could use the same logic for both stream processing and reprocessing just >>> by replacing sources and sinks? Maybe after some slight modifications? >>> >>> >>> Regarding option 2: >>> Here I understand the current limitation (which will stay for some time >>> ) is that the join can happen only on processing time, which means join >>> only with the latest version of dictionaries. >>> Accepting that, I understand I would need to do: >>> a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR >>> b) maybe implement Hive/JDBC-based LookupableTableSource that pulls the >>> whole dictionary to memory (or even to Flink state, if it is possible to >>> use it from TableFunction). >>> Then use this table and my Kafka stream table in temporal join expressed >>> with Flink SQL. >>> What do you think, is that feasible? >>> Do I understand correctly, that this option is available only with Blink >>> engine and also only with use of Flink SQL, no Table API? >>> >>> Same question comes up regarding reprocessing: do you think it would be >>> possible to use the same logic / SQL for reprocessing? >>> >>> Thank you for continuing discussion with me. I believe we're here on a >>> subject of a really important design for the community. >>> Krzysztof >>> >>> pt., 13 gru 2019 o 09:39 Kurt Young <ykt...@gmail.com> napisał(a): >>> >>>> Sorry I forgot to paste the reference url. >>>> >>>> Best, >>>> Kurt >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >>>> >>>> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> Hi Krzysztof, >>>>> >>>>> What you raised also interested us a lot to achieve in Flink. >>>>> Unfortunately, there >>>>> is no in place solution in Table/SQL API yet, but you have 2 options >>>>> which are both >>>>> close to this thus need some modifications. >>>>> >>>>> 1. The first one is use temporal table function [1]. It needs you to >>>>> write the logic of >>>>> reading hive tables and do the daily update inside the table function. >>>>> 2. The second choice is to use temporal table join [2], which only >>>>> works with processing >>>>> time now (just like the simple solution you mentioned), and need the >>>>> table source has >>>>> look up capability (like hbase). Currently, hive connector doesn't >>>>> support look up, so to >>>>> make this work, you need to sync the content to other storages which >>>>> support look up, >>>>> like HBase. >>>>> >>>>> Both solutions are not ideal now, and we also aims to improve this >>>>> maybe in the following >>>>> release. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki < >>>>> k.zarzy...@gmail.com> wrote: >>>>> >>>>>> Hello dear Flinkers, >>>>>> If this kind of question was asked on the groups, I'm sorry for a >>>>>> duplicate. Feel free to just point me to the thread. >>>>>> I have to solve a probably pretty common case of joining a datastream >>>>>> to a dataset. >>>>>> Let's say I have the following setup: >>>>>> * I have a high pace stream of events coming in Kafka. >>>>>> * I have some dimension tables stored in Hive. These tables are >>>>>> changed daily. I can keep a snapshot for each day. >>>>>> >>>>>> Now conceptually, I would like to join the stream of incoming events >>>>>> to the dimension tables (simple hash join). we can consider two cases: >>>>>> 1) simpler, where I join the stream with the most recent version of >>>>>> the dictionaries. (So the result is accepted to be nondeterministic if >>>>>> the >>>>>> job is retried). >>>>>> 2) more advanced, where I would like to do temporal join of the >>>>>> stream with dictionaries snapshots that were valid at the time of the >>>>>> event. (This result should be deterministic). >>>>>> >>>>>> The end goal is to do aggregation of that joined stream, >>>>>> store results in Hive or more real-time analytical store (Druid). >>>>>> >>>>>> Now, could you please help me understand is any of these cases >>>>>> implementable with declarative Table/SQL API? With use of temporal joins, >>>>>> catalogs, Hive integration, JDBC connectors, or whatever beta features >>>>>> there are now. (I've read quite a lot of Flink docs about each of those, >>>>>> but I have a problem to compile this information in the final design.) >>>>>> Could you please help me understand how these components should >>>>>> cooperate? >>>>>> If that is impossible with Table API, can we come up with the easiest >>>>>> implementation using Datastream API ? >>>>>> >>>>>> Thanks a lot for any help! >>>>>> Krzysztof >>>>>> >>>>>>