Hi One way would look like as following 1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
CREATE TABLE myTopic ( id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset') 2. create the build table from mysql as following. You could find more detailed information from doc[2] CREATE TABLE MyUserTable ( id BIGINT, sex STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users'); 3. join the tables as following. You could find more detailed information from doc[3] -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = MyUserTable.id; [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html Best, Guowei On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com> wrote: > Hi Guowei Ma, > > Thanks for your reply, > In my case. > I've some data on my kafka topic. and i want to get the detail of the data > from my reference mysql table. > for example : > > in my kafka topic i've this fields : > > id, name, position, experience > > in my reference mysql table i've this fields: > > id, name, age, sex > > So , i want to do left join to get the detail data from my reference > table. > > How can i do this with flink? > Pls advice > > On 17/11/20 07:46, Guowei Ma wrote: > > Hi, Youzha > > In general `CoGroup` is for the window based operation. How it could > satisfy your requirements depends on your specific scenario. But if you > want to look at the mysql table as a dimension table. There might be other > two ways: > 1. Using Table/Sql SDK. You could find a sql example(temporal join the > JDBC table as a dimension table) in the table jdbc connector [1] and more > join information in the [2] > 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function > could satisfy your requirements. You could find the example in [3]. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html > > Best, > Guowei > > > On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com> wrote: > >> Hi i want to do join reference between kafka with mysql table reference. >> how can i do this thing with flink stream. does coGroup function can handle >> this ? or anyone have java sample code with this case? i’ve read some >> article that said if cogroup function can do left outer join. but i’m still >> confuse to implement it because i just learned flink stream. >> >> >> need advice pls. >> > >