Thanks Kurt for your answers.

Summing up, I feel like the option 1 (i.e. join with temporal table
function) requires some coding around a source, that needs to pull data
once a day. But otherwise, bring the following benefits:
* I don't have to put dicts in another store like Hbase. All stays in
Hive + Flink.
* I'll be able to make a true temporal join - event-time based.
* I believe I will be able to build a history reprocessing program based on
the same logic (i.e. same SQL). At least for a particular day - processing
multiple days would be tricky, because I will need to pull multiple
versions of the dictionary.
Plus, looking up dict values will be much faster and resource optimal when
dict is stored in a state instead of uncached Hbase. It's especially
important in a case when we want to reprocess historical, archived stream
with a speed of millions of events/sec.

I understand that option 2 is easier to implement. I may do a PoC of it as
well.
OK, I believe I know enough to get my hands dirty with the code. I can
share later on what I was able to accomplish. And probably more questions
will show up when I finally start the implementation.

Thanks
Krzysztof

pon., 16 gru 2019 o 03:14 Kurt Young <ykt...@gmail.com> napisał(a):

> Hi Krzysztof, thanks for the discussion, you raised lots of good
> questions, I will try to reply them
> one by one.
>
> Re option 1:
>
> > Question 1: do I need to write that Hive source or can I use something
> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
> class?
>
> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently
> `HiveTableSource` works
> as batch mode, it will read all data at once and stop. But what you need
> is wait until next day after
> finish. What you can try is reuse the logic of `HiveTableInputFormat`, and
> wrap the "monitoring"
> logic outside.
>
> > Question/worry 2:  the state would grow inifinitely if I had infinite
> number of keys, but not only infinite number of versions of all keys.
>
> The temporal table function doesn't support watermark based state clean up
> yet, but what you can
> try is idle state retention [1]. So even if you have infinite number of
> keys, for example say you have
> different join keys every day, the old keys will not be touched in next
> days and become idle and will
> be deleted by framework.
>
> > Question 3: Do you imagine that I could use the same logic for both
> stream processing and reprocessing just by replacing sources and sinks?
>
> Generally speaking, yes I think so. With event time based join, we should
> be able to reuse the logic
> of normal stream processing and reprocessing historical data. Although
> there will definitely exists some
> details should be addressed, like event time and watermarks.
>
> Re option 2:
>
> > maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
> whole dictionary to memory
>
> You can do this manually but I would recommend you go with the first
> choice which loads hive table
> to HBase periodically. It's much more easier and efficient. And this
> approach you mentioned also
> seems a little bit duplicate with the temporal table function solution.
>
> > this option is available only with Blink engine and also only with use
> of Flink SQL, no Table API?
>
> I'm afraid yes, you can only use it with SQL for now.
>
> > do you think it would be possible to use the same logic / SQL for
> reprocessing?
>
> Given the fact this solution is based on processing time, I don't think it
> can cover the use case of
> reprocessing, except if you can accept always joining with latest day even
> during backfilling. But we
> are also aiming to resolve this shortcoming maybe in 1 or 2 releases.
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
>
> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <k.zarzy...@gmail.com>
> wrote:
>
>> Very interesting, Kurt! Yes, I also imagined it's rather a very common
>> case. In my company we currently have 3 clients wanting this functionality.
>> I also just realized this slight difference between Temporal Join and
>> Temporal Table Function Join, that there are actually two methods:)
>>
>> Regarding option 1:
>> So I would need to:
>> * write a Datastream API source, that pulls Hive dictionary table every
>> let's say day, assigns event time column to rows and creates a stream of
>> it. It does that and only that.
>> * create a table (from Table API) out of it, assigning one of the columns
>> as an event time column.
>> * then use table.createTemporalTableFunction(<all columns, including time
>> column>)
>> * finally join my main data stream with the temporal table function (let
>> me use short name TTF from now) from my dictionary, using Flink SQL and 
>> LATERAL
>> TABLE (Rates(o.rowtime)) AS r construct.
>> And so I should achieve my temporal event-time based join with versioned
>> dictionaries!
>> Question 1: do I need to write that Hive source or can I use something
>> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
>> class?
>>
>> Question/worry 2: One thing that worried me is this comment in the docs:
>>
>> *Note: State retention defined in a query configuration
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html>
>>  is
>> not yet implemented for temporal joins. This means that the required state
>> to compute the query result might grow infinitely depending on the number
>> of distinct primary keys for the history table.  *
>>
>> On the other side, I find this comment: *By definition of event
>> time, watermarks
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html> 
>> allow
>> the join operation to move forward in time and discard versions of the
>> build table that are no longer necessary because no incoming row with lower
>> or equal timestamp is expected.*
>> So I believe that the state would grow inifinitely if I had infinite
>> number of keys, but not only infinite number of versions of all keys. Which
>> is fine. Do you confirm?
>>
>> Question 3: I need to be able to cover also reprocessing or backfilling
>> of historical data. Let's say I would need to join data stream and
>> (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I
>> could use the same logic for both stream processing and reprocessing just
>> by replacing sources and sinks? Maybe after some slight modifications?
>>
>>
>> Regarding option 2:
>> Here I understand the current limitation (which will stay for some time )
>> is that the join can happen only on processing time, which means join only
>> with the latest version of dictionaries.
>> Accepting that, I understand I would need to do:
>> a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
>> b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
>> whole dictionary to memory (or even to Flink state, if it is possible to
>> use it from TableFunction).
>> Then use this table and my Kafka stream table in temporal join expressed
>> with Flink SQL.
>> What do you think, is that feasible?
>> Do I understand correctly, that this option is available only with Blink
>> engine and also only with use of Flink SQL, no Table API?
>>
>> Same question comes up regarding reprocessing: do you think it would be
>> possible to use the same logic / SQL for reprocessing?
>>
>> Thank you for continuing discussion with me. I believe we're here on a
>> subject of a really important design for the community.
>> Krzysztof
>>
>> pt., 13 gru 2019 o 09:39 Kurt Young <ykt...@gmail.com> napisał(a):
>>
>>> Sorry I forgot to paste the reference url.
>>>
>>> Best,
>>> Kurt
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>
>>> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> Hi Krzysztof,
>>>>
>>>> What you raised also interested us a lot to achieve in Flink.
>>>> Unfortunately, there
>>>> is no in place solution in Table/SQL API yet, but you have 2 options
>>>> which are both
>>>> close to this thus need some modifications.
>>>>
>>>> 1. The first one is use temporal table function [1]. It needs you to
>>>> write the logic of
>>>> reading hive tables and do the daily update inside the table function.
>>>> 2. The second choice is to use temporal table join [2], which only
>>>> works with processing
>>>> time now (just like the simple solution you mentioned), and need the
>>>> table source has
>>>> look up capability (like hbase). Currently, hive connector doesn't
>>>> support look up, so to
>>>> make this work, you need to sync the content to other storages which
>>>> support look up,
>>>> like HBase.
>>>>
>>>> Both solutions are not ideal now, and we also aims to improve this
>>>> maybe in the following
>>>> release.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <
>>>> k.zarzy...@gmail.com> wrote:
>>>>
>>>>> Hello dear Flinkers,
>>>>> If this kind of question was asked on the groups, I'm sorry for a
>>>>> duplicate. Feel free to just point me to the thread.
>>>>> I have to solve a probably pretty common case of joining a datastream
>>>>> to a dataset.
>>>>> Let's say I have the following setup:
>>>>> * I have a high pace stream of events coming in Kafka.
>>>>> * I have some dimension tables stored in Hive. These tables are
>>>>> changed daily. I can keep a snapshot for each day.
>>>>>
>>>>> Now conceptually, I would like to join the stream of incoming events
>>>>> to the dimension tables (simple hash join). we can consider two cases:
>>>>> 1) simpler, where I join the stream with the most recent version of
>>>>> the dictionaries. (So the result is accepted to be nondeterministic if the
>>>>> job is retried).
>>>>> 2) more advanced, where I would like to do temporal join of the stream
>>>>> with dictionaries snapshots that were valid at the time of the event. 
>>>>> (This
>>>>> result should be deterministic).
>>>>>
>>>>> The end goal is to do aggregation of that joined stream, store results
>>>>> in Hive or more real-time analytical store (Druid).
>>>>>
>>>>> Now, could you please help me understand is any of these cases
>>>>> implementable with declarative Table/SQL API? With use of temporal joins,
>>>>> catalogs, Hive integration, JDBC connectors, or whatever beta features
>>>>> there are now. (I've read quite a lot of Flink docs about each of those,
>>>>> but I have a problem to compile this information in the final design.)
>>>>> Could you please help me understand how these components should
>>>>> cooperate?
>>>>> If that is impossible with Table API, can we come up with the easiest
>>>>> implementation using Datastream API ?
>>>>>
>>>>> Thanks a lot for any help!
>>>>> Krzysztof
>>>>>
>>>>>

Reply via email to