Hi Dawid,
thanks for your answers!

I guess CDC and Debezium will be the right choice for that case. Iit's
integration with oracle however, is in incubating phase as documentation
states [1], we will need to investigate further.

I was hoping there will be some way to incorporate LookupCache [2] in that
scenario and bypass the problem of finished tasks. JDBCDynamicTableSource,
that you mentioned, implements both LookupTableSource and ScanTableSource,
but.I could not force the planner to invoke getLookupRuntimeProvider [3]

Anyway, I will take a closer look at JDBCDynamicTableSource implementation
and try to find a workaround

best regards,
Marek

[1] https://debezium.io/documentation/reference/connectors/oracle.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/connector/source/LookupTableSource.html#getLookupRuntimeProvider-org.apache.flink.table.connector.source.LookupTableSource.LookupContext-


czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <dwysakow...@apache.org>
napisał(a):

> Hi Marek,
>
> I am afraid I don't have a good answer for your question. The problem
> indeed is that the JDBC source can work only as a bounded source. As you
> correctly pointed out, as of now mixing bounded with unbounded sources does
> not work with checkpointing, which we want to address in the FLIP-147 (that
> you linked as well).
>
> I agree one solution would be to change the implementation of
> JDBCDynamicTableSource so that it produces an UNBOUNDED source.
> Unfortunately it is not the most straightforward task.
>
> Another solution would be to actually use a CDC. I think you could use one
> of the connectors from here[1], which use the embedded Debezium engine,
> therefore you would not need to setup any external tools, but just embed
> the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those
> connectors myself.
>
> Unfortunately I don't have any other ideas right now. Maybe someone else
> can chime in @Timo @Jark
>
> Lastly, I think once you solve the problem of a finishing source you could
> also consider using the temporal join[2] instead of an interval join.
>
> Best,
>
> Dawid
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins
> On 12/01/2021 16:40, Marek Maj wrote:
>
> Hello,
> I am trying to use Flink SQL api to join two tables. My stream data source
> is kafka (defined through catalog and schema registry) and my enrichment
> data is located in relational database (JDBC connector). I think this setup
> reflects quite common use case
>
> Enrichment table definition looks like this:
> CREATE TABLE dim (
>   ID BIGINT,
>   ENRICH STRING,
>   FROM_DATE TIMESTAMP(6),
>   TO_DATE TIMESTAMP(6),
>   WATERMARK FOR TO_DATE AS TO_DATE
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = ‘…’,
>    'table-name' = ‘…’,
>    'username' = ‘…’,
>    'password' = ‘…’
> )
>
> And this is join I use against stream coming from kafka (table with
> watermark spec), trunc is udf:
> SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
> D1.ENRICH as `ENRICH`,
> T1.FIELD as `FIELD`,
> FROM `kafka.topic` T1, dim D1
> WHERE T1.ENRICH_ID = D1.ID
> AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
> AND T1.START_TIME >= D1.FROM_DATE
>
> Result job graph contains two table source scan operators together with
> interval join operator.
>
> The problem I am trying to solve is how to change the character of
> enrichment table. Currently, related operator task reads whole data from
> table when the job start and finishes afterwards. Ideally, I would like to
> have continuously updated enrichment table. Is it possible to achieve
> without CDC for example by querying whole database periodically or use some
> kind of cache for keys? We can assume that enrichment table is append only,
> there are no deletes or updates, only inserts for new time intervals
>
> If updates are not possible, how can I deal with finished task? Due to a
> known issue [1], all checkpoints are aborted . Maybe I could live with
> restarting job to get new enrichment data as it is not refreshed so
> frequently, but checkpointing is a must.
>
> flink version 1.12
>
> regards
> Marek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
>

Reply via email to