Hi,

i'm using java for do this thing.
and i've success to register the tables.

i've success to select each table.

Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");

but when i try join query i've some error msg like this :

Caused by: org.apache.flink.table.api.TableException: Generic RAW types must have a common type information. at org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)

is there any somethine that i missed here?

On 23/11/20 08:43, Guowei Ma wrote:
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
|CREATE  TABLE  myTopic  (
  id  BIGINT,
  item_id  BIGINT,
  category_id  BIGINT,
  behavior  STRING,
  ts  TIMESTAMP(3)
)  WITH  (
  'connector'  =  'kafka',
  'topic'  =  'user_behavior',
  'properties.bootstrap.servers'  =  'localhost:9092',
  'properties.group.id  <http://properties.group.id>'  =  'testGroup',
  'format'  =  'csv',
  'scan.startup.mode'  =  'earliest-offset'
)|
2. create the build table from mysql as following. You could find more detailed information from doc[2]
|CREATE  TABLE  MyUserTable  (
   id  BIGINT,
   sex  STRING,
   age  INT,
   status  BOOLEAN,
   PRIMARY  KEY  (id)  NOT  ENFORCED
)  WITH  (
    'connector'  =  'jdbc',
    'url'  =  'jdbc:mysql://localhost:3306/mydatabase',
    'table-name'  =  'users'
);|
3. join the tables as following. You could find more detailed information from doc[3]
|-- temporal join the JDBC table as a dimension table
SELECT  *  FROM  myTopic
LEFT  JOIN  MyUserTable  FOR  SYSTEM_TIME  AS  OF  myTopic.proctime
ON  myTopic.key  =  MyUserTable.id;|
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
[3]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html

Best,
Guowei


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com <mailto:yuza.ras...@gmail.com>> wrote:

    Hi Guowei Ma,

    Thanks for your reply,
    In my case.
    I've some data on my kafka topic. and i want to get the detail of
    the data from my reference mysql table.
    for example :

    in my kafka topic i've this fields :

    id, name, position, experience

    in my reference mysql table i've this fields:

    id, name, age, sex

    So , i want to do left join to get the detail data from my
    reference table.

    How can i do this with flink?
    Pls advice

    On 17/11/20 07:46, Guowei Ma wrote:
    Hi, Youzha

    In general `CoGroup` is for the window based operation. How it
    could satisfy your requirements depends on  your specific
    scenario. But if you want to look at the mysql table as a
    dimension table. There might be other two ways:
    1. Using Table/Sql SDK. You could find a sql example(temporal
    join the JDBC table as a dimension table) in the table jdbc
    connector [1] and more join information in the [2]
    2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
    function could satisfy your requirements. You could find the
    example in [3].


    [1]
    
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
    [2]
    
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
    [3]
    
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html

    Best,
    Guowei


    On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com
    <mailto:yuza.ras...@gmail.com>> wrote:

        Hi i want to do join reference between kafka with mysql table
        reference. how can i do this thing with flink stream. does
        coGroup function can handle this ? or anyone have java sample
        code with this case? i’ve read some article that said if
        cogroup function can do left outer join. but i’m still
        confuse to implement it because  i just learned  flink stream.


        need advice pls.



Reply via email to