Hi,
i'm using java for do this thing.
and i've success to register the tables.
i've success to select each table.
Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");
but when i try join query i've some error msg like this :
Caused by: org.apache.flink.table.api.TableException: Generic RAW types
must have a common type information. at
org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
is there any somethine that i missed here?
On 23/11/20 08:43, Guowei Ma wrote:
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more
detailed information from doc[1]
|CREATE TABLE myTopic (
id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id <http://properties.group.id>' = 'testGroup',
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)|
2. create the build table from mysql as following. You could find more
detailed information from doc[2]
|CREATE TABLE MyUserTable (
id BIGINT,
sex STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);|
3. join the tables as following. You could find more detailed
information from doc[3]
|-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;|
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
[3]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
Best,
Guowei
On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com
<mailto:yuza.ras...@gmail.com>> wrote:
Hi Guowei Ma,
Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of
the data from my reference mysql table.
for example :
in my kafka topic i've this fields :
id, name, position, experience
in my reference mysql table i've this fields:
id, name, age, sex
So , i want to do left join to get the detail data from my
reference table.
How can i do this with flink?
Pls advice
On 17/11/20 07:46, Guowei Ma wrote:
Hi, Youzha
In general `CoGroup` is for the window based operation. How it
could satisfy your requirements depends on your specific
scenario. But if you want to look at the mysql table as a
dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal
join the JDBC table as a dimension table) in the table jdbc
connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
function could satisfy your requirements. You could find the
example in [3].
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
Best,
Guowei
On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com
<mailto:yuza.ras...@gmail.com>> wrote:
Hi i want to do join reference between kafka with mysql table
reference. how can i do this thing with flink stream. does
coGroup function can handle this ? or anyone have java sample
code with this case? i’ve read some article that said if
cogroup function can do left outer join. but i’m still
confuse to implement it because i just learned flink stream.
need advice pls.