Great, looking forward to hearing from you again.

Best,
Kurt


On Mon, Dec 16, 2019 at 10:22 PM Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Thanks Kurt for your answers.
>
> Summing up, I feel like the option 1 (i.e. join with temporal table
> function) requires some coding around a source, that needs to pull data
> once a day. But otherwise, bring the following benefits:
> * I don't have to put dicts in another store like Hbase. All stays in
> Hive + Flink.
> * I'll be able to make a true temporal join - event-time based.
> * I believe I will be able to build a history reprocessing program based
> on the same logic (i.e. same SQL). At least for a particular day -
> processing multiple days would be tricky, because I will need to pull
> multiple versions of the dictionary.
> Plus, looking up dict values will be much faster and resource optimal when
> dict is stored in a state instead of uncached Hbase. It's especially
> important in a case when we want to reprocess historical, archived stream
> with a speed of millions of events/sec.
>
> I understand that option 2 is easier to implement. I may do a PoC of it as
> well.
> OK, I believe I know enough to get my hands dirty with the code. I can
> share later on what I was able to accomplish. And probably more questions
> will show up when I finally start the implementation.
>
> Thanks
> Krzysztof
>
> pon., 16 gru 2019 o 03:14 Kurt Young <ykt...@gmail.com> napisał(a):
>
>> Hi Krzysztof, thanks for the discussion, you raised lots of good
>> questions, I will try to reply them
>> one by one.
>>
>> Re option 1:
>>
>> > Question 1: do I need to write that Hive source or can I use something
>> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
>> class?
>>
>> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently
>> `HiveTableSource` works
>> as batch mode, it will read all data at once and stop. But what you need
>> is wait until next day after
>> finish. What you can try is reuse the logic of `HiveTableInputFormat`,
>> and wrap the "monitoring"
>> logic outside.
>>
>> > Question/worry 2:  the state would grow inifinitely if I had infinite
>> number of keys, but not only infinite number of versions of all keys.
>>
>> The temporal table function doesn't support watermark based state clean
>> up yet, but what you can
>> try is idle state retention [1]. So even if you have infinite number of
>> keys, for example say you have
>> different join keys every day, the old keys will not be touched in next
>> days and become idle and will
>> be deleted by framework.
>>
>> > Question 3: Do you imagine that I could use the same logic for both
>> stream processing and reprocessing just by replacing sources and sinks?
>>
>> Generally speaking, yes I think so. With event time based join, we should
>> be able to reuse the logic
>> of normal stream processing and reprocessing historical data. Although
>> there will definitely exists some
>> details should be addressed, like event time and watermarks.
>>
>> Re option 2:
>>
>> > maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
>> whole dictionary to memory
>>
>> You can do this manually but I would recommend you go with the first
>> choice which loads hive table
>> to HBase periodically. It's much more easier and efficient. And this
>> approach you mentioned also
>> seems a little bit duplicate with the temporal table function solution.
>>
>> > this option is available only with Blink engine and also only with use
>> of Flink SQL, no Table API?
>>
>> I'm afraid yes, you can only use it with SQL for now.
>>
>> > do you think it would be possible to use the same logic / SQL for
>> reprocessing?
>>
>> Given the fact this solution is based on processing time, I don't think
>> it can cover the use case of
>> reprocessing, except if you can accept always joining with latest day
>> even during backfilling. But we
>> are also aiming to resolve this shortcoming maybe in 1 or 2 releases.
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>>
>> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <k.zarzy...@gmail.com>
>> wrote:
>>
>>> Very interesting, Kurt! Yes, I also imagined it's rather a very common
>>> case. In my company we currently have 3 clients wanting this functionality.
>>> I also just realized this slight difference between Temporal Join and
>>> Temporal Table Function Join, that there are actually two methods:)
>>>
>>> Regarding option 1:
>>> So I would need to:
>>> * write a Datastream API source, that pulls Hive dictionary table every
>>> let's say day, assigns event time column to rows and creates a stream of
>>> it. It does that and only that.
>>> * create a table (from Table API) out of it, assigning one of the
>>> columns as an event time column.
>>> * then use table.createTemporalTableFunction(<all columns, including
>>> time column>)
>>> * finally join my main data stream with the temporal table function (let
>>> me use short name TTF from now) from my dictionary, using Flink SQL and 
>>> LATERAL
>>> TABLE (Rates(o.rowtime)) AS r construct.
>>> And so I should achieve my temporal event-time based join with versioned
>>> dictionaries!
>>> Question 1: do I need to write that Hive source or can I use something
>>> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
>>> class?
>>>
>>> Question/worry 2: One thing that worried me is this comment in the docs:
>>>
>>> *Note: State retention defined in a query configuration
>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html>
>>>  is
>>> not yet implemented for temporal joins. This means that the required state
>>> to compute the query result might grow infinitely depending on the number
>>> of distinct primary keys for the history table.  *
>>>
>>> On the other side, I find this comment: *By definition of event
>>> time, watermarks
>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html>
>>>  allow
>>> the join operation to move forward in time and discard versions of the
>>> build table that are no longer necessary because no incoming row with lower
>>> or equal timestamp is expected.*
>>> So I believe that the state would grow inifinitely if I had infinite
>>> number of keys, but not only infinite number of versions of all keys. Which
>>> is fine. Do you confirm?
>>>
>>> Question 3: I need to be able to cover also reprocessing or backfilling
>>> of historical data. Let's say I would need to join data stream and
>>> (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I
>>> could use the same logic for both stream processing and reprocessing just
>>> by replacing sources and sinks? Maybe after some slight modifications?
>>>
>>>
>>> Regarding option 2:
>>> Here I understand the current limitation (which will stay for some time
>>> ) is that the join can happen only on processing time, which means join
>>> only with the latest version of dictionaries.
>>> Accepting that, I understand I would need to do:
>>> a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
>>> b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
>>> whole dictionary to memory (or even to Flink state, if it is possible to
>>> use it from TableFunction).
>>> Then use this table and my Kafka stream table in temporal join expressed
>>> with Flink SQL.
>>> What do you think, is that feasible?
>>> Do I understand correctly, that this option is available only with Blink
>>> engine and also only with use of Flink SQL, no Table API?
>>>
>>> Same question comes up regarding reprocessing: do you think it would be
>>> possible to use the same logic / SQL for reprocessing?
>>>
>>> Thank you for continuing discussion with me. I believe we're here on a
>>> subject of a really important design for the community.
>>> Krzysztof
>>>
>>> pt., 13 gru 2019 o 09:39 Kurt Young <ykt...@gmail.com> napisał(a):
>>>
>>>> Sorry I forgot to paste the reference url.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>>
>>>> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> Hi Krzysztof,
>>>>>
>>>>> What you raised also interested us a lot to achieve in Flink.
>>>>> Unfortunately, there
>>>>> is no in place solution in Table/SQL API yet, but you have 2 options
>>>>> which are both
>>>>> close to this thus need some modifications.
>>>>>
>>>>> 1. The first one is use temporal table function [1]. It needs you to
>>>>> write the logic of
>>>>> reading hive tables and do the daily update inside the table function.
>>>>> 2. The second choice is to use temporal table join [2], which only
>>>>> works with processing
>>>>> time now (just like the simple solution you mentioned), and need the
>>>>> table source has
>>>>> look up capability (like hbase). Currently, hive connector doesn't
>>>>> support look up, so to
>>>>> make this work, you need to sync the content to other storages which
>>>>> support look up,
>>>>> like HBase.
>>>>>
>>>>> Both solutions are not ideal now, and we also aims to improve this
>>>>> maybe in the following
>>>>> release.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <
>>>>> k.zarzy...@gmail.com> wrote:
>>>>>
>>>>>> Hello dear Flinkers,
>>>>>> If this kind of question was asked on the groups, I'm sorry for a
>>>>>> duplicate. Feel free to just point me to the thread.
>>>>>> I have to solve a probably pretty common case of joining a datastream
>>>>>> to a dataset.
>>>>>> Let's say I have the following setup:
>>>>>> * I have a high pace stream of events coming in Kafka.
>>>>>> * I have some dimension tables stored in Hive. These tables are
>>>>>> changed daily. I can keep a snapshot for each day.
>>>>>>
>>>>>> Now conceptually, I would like to join the stream of incoming events
>>>>>> to the dimension tables (simple hash join). we can consider two cases:
>>>>>> 1) simpler, where I join the stream with the most recent version of
>>>>>> the dictionaries. (So the result is accepted to be nondeterministic if 
>>>>>> the
>>>>>> job is retried).
>>>>>> 2) more advanced, where I would like to do temporal join of the
>>>>>> stream with dictionaries snapshots that were valid at the time of the
>>>>>> event. (This result should be deterministic).
>>>>>>
>>>>>> The end goal is to do aggregation of that joined stream,
>>>>>> store results in Hive or more real-time analytical store (Druid).
>>>>>>
>>>>>> Now, could you please help me understand is any of these cases
>>>>>> implementable with declarative Table/SQL API? With use of temporal joins,
>>>>>> catalogs, Hive integration, JDBC connectors, or whatever beta features
>>>>>> there are now. (I've read quite a lot of Flink docs about each of those,
>>>>>> but I have a problem to compile this information in the final design.)
>>>>>> Could you please help me understand how these components should
>>>>>> cooperate?
>>>>>> If that is impossible with Table API, can we come up with the easiest
>>>>>> implementation using Datastream API ?
>>>>>>
>>>>>> Thanks a lot for any help!
>>>>>> Krzysztof
>>>>>>
>>>>>>

Reply via email to