Could you share your code? Best, Guowei
On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yuza.ras...@gmail.com> wrote: > Hi, > > i'm using java for do this thing. > and i've success to register the tables. > > i've success to select each table. > > Table result1 = tEnv.sqlQuery("select status_code from table_kafka"); > Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff"); > > but when i try join query i've some error msg like this : > > Caused by: org.apache.flink.table.api.TableException: Generic RAW types > must have a common type information. at > org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381) > > > is there any somethine that i missed here? > > On 23/11/20 08:43, Guowei Ma wrote: > > Hi > One way would look like as following > 1. create the probe table from Kafka as following. You could find more > detailed information from doc[1] > > CREATE TABLE myTopic ( > id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3)) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'csv', > 'scan.startup.mode' = 'earliest-offset') > > 2. create the build table from mysql as following. You could find more > detailed information from doc[2] > > CREATE TABLE MyUserTable ( > id BIGINT, > sex STRING, > age INT, > status BOOLEAN, > PRIMARY KEY (id) NOT ENFORCED) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > 'table-name' = 'users'); > > 3. join the tables as following. You could find more detailed information > from doc[3] > > -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT > JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = > MyUserTable.id; > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html > > Best, > Guowei > > > On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com> wrote: > >> Hi Guowei Ma, >> >> Thanks for your reply, >> In my case. >> I've some data on my kafka topic. and i want to get the detail of the >> data from my reference mysql table. >> for example : >> >> in my kafka topic i've this fields : >> >> id, name, position, experience >> >> in my reference mysql table i've this fields: >> >> id, name, age, sex >> >> So , i want to do left join to get the detail data from my reference >> table. >> >> How can i do this with flink? >> Pls advice >> >> On 17/11/20 07:46, Guowei Ma wrote: >> >> Hi, Youzha >> >> In general `CoGroup` is for the window based operation. How it could >> satisfy your requirements depends on your specific scenario. But if you >> want to look at the mysql table as a dimension table. There might be other >> two ways: >> 1. Using Table/Sql SDK. You could find a sql example(temporal join the >> JDBC table as a dimension table) in the table jdbc connector [1] and more >> join information in the [2] >> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` >> function could satisfy your requirements. You could find the example in >> [3]. >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html >> >> Best, >> Guowei >> >> >> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com> wrote: >> >>> Hi i want to do join reference between kafka with mysql table reference. >>> how can i do this thing with flink stream. does coGroup function can handle >>> this ? or anyone have java sample code with this case? i’ve read some >>> article that said if cogroup function can do left outer join. but i’m still >>> confuse to implement it because i just learned flink stream. >>> >>> >>> need advice pls. >>> >> >> >