Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more
detailed information from doc[1]

CREATE TABLE myTopic (
 id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset')

2. create the build table from mysql as following. You could find more
detailed information from doc[2]

CREATE TABLE MyUserTable (
  id BIGINT,
  sex STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users');

3. join the tables as following. You could find more detailed information
from doc[3]

-- temporal join the JDBC table as a dimension tableSELECT * FROM
myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON
myTopic.key = MyUserTable.id;

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html

Best,
Guowei


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com> wrote:

> Hi Guowei Ma,
>
> Thanks for your reply,
> In my case.
> I've some data on my kafka topic. and i want to get the detail of the data
> from my reference mysql table.
> for example :
>
> in my kafka topic i've this fields :
>
> id, name, position, experience
>
> in my reference mysql table i've this fields:
>
> id, name, age, sex
>
> So , i want to do left join to get the detail data from my reference
> table.
>
> How can i do this with flink?
> Pls advice
>
> On 17/11/20 07:46, Guowei Ma wrote:
>
> Hi, Youzha
>
> In general `CoGroup` is for the window based operation. How it could
> satisfy your requirements depends on  your specific scenario. But if you
> want to look at the mysql table as a dimension table. There might be other
> two ways:
> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
> JDBC table as a dimension table) in the table jdbc connector [1] and more
> join information in the [2]
> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function
> could satisfy your requirements. You could find the example in [3].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
> Best,
> Guowei
>
>
> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com> wrote:
>
>> Hi i want to do join reference between kafka with mysql table reference.
>> how can i do this thing with flink stream. does coGroup function can handle
>> this ? or anyone have java sample code with this case? i’ve read some
>> article that said if cogroup function can do left outer join. but i’m still
>> confuse to implement it because  i just learned  flink stream.
>>
>>
>> need advice pls.
>>
>
>

Reply via email to