Good point, thanks for the clarification

best regards,
Marek

pt., 15 sty 2021 o 09:32 Dawid Wysakowicz <dwysakow...@apache.org>
napisał(a):

> The LookupTableSource is used when you join based on processing time, as
> described in here[1]. Moreover it supports only equi lookups, therefore it
> won't work with range queries as in your case.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join
> On 15/01/2021 09:25, Marek Maj wrote:
>
> Hi Dawid,
> thanks for your answers!
>
> I guess CDC and Debezium will be the right choice for that case. Iit's
> integration with oracle however, is in incubating phase as documentation
> states [1], we will need to investigate further.
>
> I was hoping there will be some way to incorporate LookupCache [2] in that
> scenario and bypass the problem of finished tasks. JDBCDynamicTableSource,
> that you mentioned, implements both LookupTableSource and ScanTableSource,
> but.I could not force the planner to invoke getLookupRuntimeProvider [3]
>
> Anyway, I will take a closer look at JDBCDynamicTableSource implementation
> and try to find a workaround
>
> best regards,
> Marek
>
> [1] https://debezium.io/documentation/reference/connectors/oracle.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/connector/source/LookupTableSource.html#getLookupRuntimeProvider-org.apache.flink.table.connector.source.LookupTableSource.LookupContext-
>
>
> czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <dwysakow...@apache.org>
> napisał(a):
>
>> Hi Marek,
>>
>> I am afraid I don't have a good answer for your question. The problem
>> indeed is that the JDBC source can work only as a bounded source. As you
>> correctly pointed out, as of now mixing bounded with unbounded sources does
>> not work with checkpointing, which we want to address in the FLIP-147 (that
>> you linked as well).
>>
>> I agree one solution would be to change the implementation of
>> JDBCDynamicTableSource so that it produces an UNBOUNDED source.
>> Unfortunately it is not the most straightforward task.
>>
>> Another solution would be to actually use a CDC. I think you could use
>> one of the connectors from here[1], which use the embedded Debezium engine,
>> therefore you would not need to setup any external tools, but just embed
>> the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those
>> connectors myself.
>>
>> Unfortunately I don't have any other ideas right now. Maybe someone else
>> can chime in @Timo @Jark
>>
>> Lastly, I think once you solve the problem of a finishing source you
>> could also consider using the temporal join[2] instead of an interval join.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins
>> On 12/01/2021 16:40, Marek Maj wrote:
>>
>> Hello,
>> I am trying to use Flink SQL api to join two tables. My stream data
>> source is kafka (defined through catalog and schema registry) and my
>> enrichment data is located in relational database (JDBC connector). I think
>> this setup reflects quite common use case
>>
>> Enrichment table definition looks like this:
>> CREATE TABLE dim (
>>   ID BIGINT,
>>   ENRICH STRING,
>>   FROM_DATE TIMESTAMP(6),
>>   TO_DATE TIMESTAMP(6),
>>   WATERMARK FOR TO_DATE AS TO_DATE
>> ) WITH (
>>    'connector' = 'jdbc',
>>    'url' = ‘…’,
>>    'table-name' = ‘…’,
>>    'username' = ‘…’,
>>    'password' = ‘…’
>> )
>>
>> And this is join I use against stream coming from kafka (table with
>> watermark spec), trunc is udf:
>> SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
>> D1.ENRICH as `ENRICH`,
>> T1.FIELD as `FIELD`,
>> FROM `kafka.topic` T1, dim D1
>> WHERE T1.ENRICH_ID = D1.ID
>> AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
>> AND T1.START_TIME >= D1.FROM_DATE
>>
>> Result job graph contains two table source scan operators together with
>> interval join operator.
>>
>> The problem I am trying to solve is how to change the character of
>> enrichment table. Currently, related operator task reads whole data from
>> table when the job start and finishes afterwards. Ideally, I would like to
>> have continuously updated enrichment table. Is it possible to achieve
>> without CDC for example by querying whole database periodically or use some
>> kind of cache for keys? We can assume that enrichment table is append only,
>> there are no deletes or updates, only inserts for new time intervals
>>
>> If updates are not possible, how can I deal with finished task? Due to a
>> known issue [1], all checkpoints are aborted . Maybe I could live with
>> restarting job to get new enrichment data as it is not refreshed so
>> frequently, but checkpointing is a must.
>>
>> flink version 1.12
>>
>> regards
>> Marek
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>
>>

Reply via email to