Could you share your code?
Best,
Guowei

On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yuza.ras...@gmail.com> wrote:

> Hi,
>
> i'm using java for do this thing.
> and i've success to register the tables.
>
> i've success to select each table.
>
> Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
> Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");
>
> but when i try join query i've some error msg like this :
>
> Caused by: org.apache.flink.table.api.TableException: Generic RAW types
> must have a common type information. at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
>
>
> is there any somethine that i missed here?
>
> On 23/11/20 08:43, Guowei Ma wrote:
>
> Hi
> One way would look like as following
> 1. create the probe table from Kafka as following. You could find more
> detailed information from doc[1]
>
> CREATE TABLE myTopic (
>  id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset')
>
> 2. create the build table from mysql as following. You could find more
> detailed information from doc[2]
>
> CREATE TABLE MyUserTable (
>   id BIGINT,
>   sex STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>    'table-name' = 'users');
>
> 3. join the tables as following. You could find more detailed information
> from doc[3]
>
> -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT 
> JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = 
> MyUserTable.id;
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>
> Best,
> Guowei
>
>
> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com> wrote:
>
>> Hi Guowei Ma,
>>
>> Thanks for your reply,
>> In my case.
>> I've some data on my kafka topic. and i want to get the detail of the
>> data from my reference mysql table.
>> for example :
>>
>> in my kafka topic i've this fields :
>>
>> id, name, position, experience
>>
>> in my reference mysql table i've this fields:
>>
>> id, name, age, sex
>>
>> So , i want to do left join to get the detail data from my reference
>> table.
>>
>> How can i do this with flink?
>> Pls advice
>>
>> On 17/11/20 07:46, Guowei Ma wrote:
>>
>> Hi, Youzha
>>
>> In general `CoGroup` is for the window based operation. How it could
>> satisfy your requirements depends on  your specific scenario. But if you
>> want to look at the mysql table as a dimension table. There might be other
>> two ways:
>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
>> JDBC table as a dimension table) in the table jdbc connector [1] and more
>> join information in the [2]
>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>> function could satisfy your requirements. You could find the example in
>> [3].
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com> wrote:
>>
>>> Hi i want to do join reference between kafka with mysql table reference.
>>> how can i do this thing with flink stream. does coGroup function can handle
>>> this ? or anyone have java sample code with this case? i’ve read some
>>> article that said if cogroup function can do left outer join. but i’m still
>>> confuse to implement it because  i just learned  flink stream.
>>>
>>>
>>> need advice pls.
>>>
>>
>>
>

Reply via email to