Thanks Kurt for your answers. Summing up, I feel like the option 1 (i.e. join with temporal table function) requires some coding around a source, that needs to pull data once a day. But otherwise, bring the following benefits: * I don't have to put dicts in another store like Hbase. All stays in Hive + Flink. * I'll be able to make a true temporal join - event-time based. * I believe I will be able to build a history reprocessing program based on the same logic (i.e. same SQL). At least for a particular day - processing multiple days would be tricky, because I will need to pull multiple versions of the dictionary. Plus, looking up dict values will be much faster and resource optimal when dict is stored in a state instead of uncached Hbase. It's especially important in a case when we want to reprocess historical, archived stream with a speed of millions of events/sec.
I understand that option 2 is easier to implement. I may do a PoC of it as well. OK, I believe I know enough to get my hands dirty with the code. I can share later on what I was able to accomplish. And probably more questions will show up when I finally start the implementation. Thanks Krzysztof pon., 16 gru 2019 o 03:14 Kurt Young <ykt...@gmail.com> napisał(a): > Hi Krzysztof, thanks for the discussion, you raised lots of good > questions, I will try to reply them > one by one. > > Re option 1: > > > Question 1: do I need to write that Hive source or can I use something > ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource > class? > > I'm not sure if you can reuse the logic of `HiveTableSource`. Currently > `HiveTableSource` works > as batch mode, it will read all data at once and stop. But what you need > is wait until next day after > finish. What you can try is reuse the logic of `HiveTableInputFormat`, and > wrap the "monitoring" > logic outside. > > > Question/worry 2: the state would grow inifinitely if I had infinite > number of keys, but not only infinite number of versions of all keys. > > The temporal table function doesn't support watermark based state clean up > yet, but what you can > try is idle state retention [1]. So even if you have infinite number of > keys, for example say you have > different join keys every day, the old keys will not be touched in next > days and become idle and will > be deleted by framework. > > > Question 3: Do you imagine that I could use the same logic for both > stream processing and reprocessing just by replacing sources and sinks? > > Generally speaking, yes I think so. With event time based join, we should > be able to reuse the logic > of normal stream processing and reprocessing historical data. Although > there will definitely exists some > details should be addressed, like event time and watermarks. > > Re option 2: > > > maybe implement Hive/JDBC-based LookupableTableSource that pulls the > whole dictionary to memory > > You can do this manually but I would recommend you go with the first > choice which loads hive table > to HBase periodically. It's much more easier and efficient. And this > approach you mentioned also > seems a little bit duplicate with the temporal table function solution. > > > this option is available only with Blink engine and also only with use > of Flink SQL, no Table API? > > I'm afraid yes, you can only use it with SQL for now. > > > do you think it would be possible to use the same logic / SQL for > reprocessing? > > Given the fact this solution is based on processing time, I don't think it > can cover the use case of > reprocessing, except if you can accept always joining with latest day even > during backfilling. But we > are also aiming to resolve this shortcoming maybe in 1 or 2 releases. > > Best, > Kurt > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time > > > On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <k.zarzy...@gmail.com> > wrote: > >> Very interesting, Kurt! Yes, I also imagined it's rather a very common >> case. In my company we currently have 3 clients wanting this functionality. >> I also just realized this slight difference between Temporal Join and >> Temporal Table Function Join, that there are actually two methods:) >> >> Regarding option 1: >> So I would need to: >> * write a Datastream API source, that pulls Hive dictionary table every >> let's say day, assigns event time column to rows and creates a stream of >> it. It does that and only that. >> * create a table (from Table API) out of it, assigning one of the columns >> as an event time column. >> * then use table.createTemporalTableFunction(<all columns, including time >> column>) >> * finally join my main data stream with the temporal table function (let >> me use short name TTF from now) from my dictionary, using Flink SQL and >> LATERAL >> TABLE (Rates(o.rowtime)) AS r construct. >> And so I should achieve my temporal event-time based join with versioned >> dictionaries! >> Question 1: do I need to write that Hive source or can I use something >> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource >> class? >> >> Question/worry 2: One thing that worried me is this comment in the docs: >> >> *Note: State retention defined in a query configuration >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html> >> is >> not yet implemented for temporal joins. This means that the required state >> to compute the query result might grow infinitely depending on the number >> of distinct primary keys for the history table. * >> >> On the other side, I find this comment: *By definition of event >> time, watermarks >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html> >> allow >> the join operation to move forward in time and discard versions of the >> build table that are no longer necessary because no incoming row with lower >> or equal timestamp is expected.* >> So I believe that the state would grow inifinitely if I had infinite >> number of keys, but not only infinite number of versions of all keys. Which >> is fine. Do you confirm? >> >> Question 3: I need to be able to cover also reprocessing or backfilling >> of historical data. Let's say I would need to join data stream and >> (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I >> could use the same logic for both stream processing and reprocessing just >> by replacing sources and sinks? Maybe after some slight modifications? >> >> >> Regarding option 2: >> Here I understand the current limitation (which will stay for some time ) >> is that the join can happen only on processing time, which means join only >> with the latest version of dictionaries. >> Accepting that, I understand I would need to do: >> a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR >> b) maybe implement Hive/JDBC-based LookupableTableSource that pulls the >> whole dictionary to memory (or even to Flink state, if it is possible to >> use it from TableFunction). >> Then use this table and my Kafka stream table in temporal join expressed >> with Flink SQL. >> What do you think, is that feasible? >> Do I understand correctly, that this option is available only with Blink >> engine and also only with use of Flink SQL, no Table API? >> >> Same question comes up regarding reprocessing: do you think it would be >> possible to use the same logic / SQL for reprocessing? >> >> Thank you for continuing discussion with me. I believe we're here on a >> subject of a really important design for the community. >> Krzysztof >> >> pt., 13 gru 2019 o 09:39 Kurt Young <ykt...@gmail.com> napisał(a): >> >>> Sorry I forgot to paste the reference url. >>> >>> Best, >>> Kurt >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >>> >>> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> Hi Krzysztof, >>>> >>>> What you raised also interested us a lot to achieve in Flink. >>>> Unfortunately, there >>>> is no in place solution in Table/SQL API yet, but you have 2 options >>>> which are both >>>> close to this thus need some modifications. >>>> >>>> 1. The first one is use temporal table function [1]. It needs you to >>>> write the logic of >>>> reading hive tables and do the daily update inside the table function. >>>> 2. The second choice is to use temporal table join [2], which only >>>> works with processing >>>> time now (just like the simple solution you mentioned), and need the >>>> table source has >>>> look up capability (like hbase). Currently, hive connector doesn't >>>> support look up, so to >>>> make this work, you need to sync the content to other storages which >>>> support look up, >>>> like HBase. >>>> >>>> Both solutions are not ideal now, and we also aims to improve this >>>> maybe in the following >>>> release. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki < >>>> k.zarzy...@gmail.com> wrote: >>>> >>>>> Hello dear Flinkers, >>>>> If this kind of question was asked on the groups, I'm sorry for a >>>>> duplicate. Feel free to just point me to the thread. >>>>> I have to solve a probably pretty common case of joining a datastream >>>>> to a dataset. >>>>> Let's say I have the following setup: >>>>> * I have a high pace stream of events coming in Kafka. >>>>> * I have some dimension tables stored in Hive. These tables are >>>>> changed daily. I can keep a snapshot for each day. >>>>> >>>>> Now conceptually, I would like to join the stream of incoming events >>>>> to the dimension tables (simple hash join). we can consider two cases: >>>>> 1) simpler, where I join the stream with the most recent version of >>>>> the dictionaries. (So the result is accepted to be nondeterministic if the >>>>> job is retried). >>>>> 2) more advanced, where I would like to do temporal join of the stream >>>>> with dictionaries snapshots that were valid at the time of the event. >>>>> (This >>>>> result should be deterministic). >>>>> >>>>> The end goal is to do aggregation of that joined stream, store results >>>>> in Hive or more real-time analytical store (Druid). >>>>> >>>>> Now, could you please help me understand is any of these cases >>>>> implementable with declarative Table/SQL API? With use of temporal joins, >>>>> catalogs, Hive integration, JDBC connectors, or whatever beta features >>>>> there are now. (I've read quite a lot of Flink docs about each of those, >>>>> but I have a problem to compile this information in the final design.) >>>>> Could you please help me understand how these components should >>>>> cooperate? >>>>> If that is impossible with Table API, can we come up with the easiest >>>>> implementation using Datastream API ? >>>>> >>>>> Thanks a lot for any help! >>>>> Krzysztof >>>>> >>>>>