Hi, Youzha
Sorry for the late reply. It seems that the type is mis-type-match.
Could you
1. tableA.printSchema to print the schema?
2. KafkaSource.getType() to print the typeinformation?

Best,
Guowei


On Mon, Nov 23, 2020 at 5:28 PM Youzha <yuza.ras...@gmail.com> wrote:

> Hi, this is sample code :
>
>
> Table tableA = tEnv.fromDataStream(KafkaSource,"timestamp, id, status");
>
> tEnv.registerTable("tbl_kafka", tableA);
>
> Table result = tEnv.sqlQuery("select * from tbl_kafka where id = 'E02'");
>
>
> fyi, i’m using avro format on my KafkaSource
>
>
> Best Regards,
>
> .
>
>
>
>
> On Mon, 23 Nov 2020 at 14.44 Guowei Ma <guowei....@gmail.com> wrote:
>
>> Could you share your code?
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yuza.ras...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> i'm using java for do this thing.
>>> and i've success to register the tables.
>>>
>>> i've success to select each table.
>>>
>>> Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
>>> Table result2 = tEnv.sqlQuery("select status_code from
>>> table_mysql_reff");
>>>
>>> but when i try join query i've some error msg like this :
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Generic RAW types
>>> must have a common type information. at
>>> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
>>>
>>>
>>> is there any somethine that i missed here?
>>>
>>> On 23/11/20 08:43, Guowei Ma wrote:
>>>
>>> Hi
>>> One way would look like as following
>>> 1. create the probe table from Kafka as following. You could find more
>>> detailed information from doc[1]
>>>
>>> CREATE TABLE myTopic (
>>>  id BIGINT,
>>>  item_id BIGINT,
>>>  category_id BIGINT,
>>>  behavior STRING,
>>>  ts TIMESTAMP(3)) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'user_behavior',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>  'properties.group.id' = 'testGroup',
>>>  'format' = 'csv',
>>>  'scan.startup.mode' = 'earliest-offset')
>>>
>>> 2. create the build table from mysql as following. You could find more
>>> detailed information from doc[2]
>>>
>>> CREATE TABLE MyUserTable (
>>>   id BIGINT,
>>>   sex STRING,
>>>   age INT,
>>>   status BOOLEAN,
>>>   PRIMARY KEY (id) NOT ENFORCED) WITH (
>>>    'connector' = 'jdbc',
>>>    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>>>    'table-name' = 'users');
>>>
>>> 3. join the tables as following. You could find more detailed
>>> information from doc[3]
>>>
>>> -- temporal join the JDBC table as a dimension tableSELECT * FROM 
>>> myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON 
>>> myTopic.key = MyUserTable.id;
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guowei Ma,
>>>>
>>>> Thanks for your reply,
>>>> In my case.
>>>> I've some data on my kafka topic. and i want to get the detail of the
>>>> data from my reference mysql table.
>>>> for example :
>>>>
>>>> in my kafka topic i've this fields :
>>>>
>>>> id, name, position, experience
>>>>
>>>> in my reference mysql table i've this fields:
>>>>
>>>> id, name, age, sex
>>>>
>>>> So , i want to do left join to get the detail data from my reference
>>>> table.
>>>>
>>>> How can i do this with flink?
>>>> Pls advice
>>>>
>>>> On 17/11/20 07:46, Guowei Ma wrote:
>>>>
>>>> Hi, Youzha
>>>>
>>>> In general `CoGroup` is for the window based operation. How it could
>>>> satisfy your requirements depends on  your specific scenario. But if you
>>>> want to look at the mysql table as a dimension table. There might be other
>>>> two ways:
>>>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
>>>> JDBC table as a dimension table) in the table jdbc connector [1] and more
>>>> join information in the [2]
>>>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>>>> function could satisfy your requirements. You could find the example in
>>>> [3].
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com> wrote:
>>>>
>>>>> Hi i want to do join reference between kafka with mysql table
>>>>> reference. how can i do this thing with flink stream. does coGroup 
>>>>> function
>>>>> can handle this ? or anyone have java sample code with this case? i’ve 
>>>>> read
>>>>> some article that said if cogroup function can do left outer join. but i’m
>>>>> still confuse to implement it because  i just learned  flink stream.
>>>>>
>>>>>
>>>>> need advice pls.
>>>>>
>>>>
>>>>
>>>

Reply via email to