Hi, Youzha Sorry for the late reply. It seems that the type is mis-type-match. Could you 1. tableA.printSchema to print the schema? 2. KafkaSource.getType() to print the typeinformation?
Best, Guowei On Mon, Nov 23, 2020 at 5:28 PM Youzha <yuza.ras...@gmail.com> wrote: > Hi, this is sample code : > > > Table tableA = tEnv.fromDataStream(KafkaSource,"timestamp, id, status"); > > tEnv.registerTable("tbl_kafka", tableA); > > Table result = tEnv.sqlQuery("select * from tbl_kafka where id = 'E02'"); > > > fyi, i’m using avro format on my KafkaSource > > > Best Regards, > > . > > > > > On Mon, 23 Nov 2020 at 14.44 Guowei Ma <guowei....@gmail.com> wrote: > >> Could you share your code? >> Best, >> Guowei >> >> >> On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yuza.ras...@gmail.com> >> wrote: >> >>> Hi, >>> >>> i'm using java for do this thing. >>> and i've success to register the tables. >>> >>> i've success to select each table. >>> >>> Table result1 = tEnv.sqlQuery("select status_code from table_kafka"); >>> Table result2 = tEnv.sqlQuery("select status_code from >>> table_mysql_reff"); >>> >>> but when i try join query i've some error msg like this : >>> >>> Caused by: org.apache.flink.table.api.TableException: Generic RAW types >>> must have a common type information. at >>> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381) >>> >>> >>> is there any somethine that i missed here? >>> >>> On 23/11/20 08:43, Guowei Ma wrote: >>> >>> Hi >>> One way would look like as following >>> 1. create the probe table from Kafka as following. You could find more >>> detailed information from doc[1] >>> >>> CREATE TABLE myTopic ( >>> id BIGINT, >>> item_id BIGINT, >>> category_id BIGINT, >>> behavior STRING, >>> ts TIMESTAMP(3)) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'user_behavior', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'testGroup', >>> 'format' = 'csv', >>> 'scan.startup.mode' = 'earliest-offset') >>> >>> 2. create the build table from mysql as following. You could find more >>> detailed information from doc[2] >>> >>> CREATE TABLE MyUserTable ( >>> id BIGINT, >>> sex STRING, >>> age INT, >>> status BOOLEAN, >>> PRIMARY KEY (id) NOT ENFORCED) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3306/mydatabase', >>> 'table-name' = 'users'); >>> >>> 3. join the tables as following. You could find more detailed >>> information from doc[3] >>> >>> -- temporal join the JDBC table as a dimension tableSELECT * FROM >>> myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON >>> myTopic.key = MyUserTable.id; >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html >>> >>> Best, >>> Guowei >>> >>> >>> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.ras...@gmail.com> >>> wrote: >>> >>>> Hi Guowei Ma, >>>> >>>> Thanks for your reply, >>>> In my case. >>>> I've some data on my kafka topic. and i want to get the detail of the >>>> data from my reference mysql table. >>>> for example : >>>> >>>> in my kafka topic i've this fields : >>>> >>>> id, name, position, experience >>>> >>>> in my reference mysql table i've this fields: >>>> >>>> id, name, age, sex >>>> >>>> So , i want to do left join to get the detail data from my reference >>>> table. >>>> >>>> How can i do this with flink? >>>> Pls advice >>>> >>>> On 17/11/20 07:46, Guowei Ma wrote: >>>> >>>> Hi, Youzha >>>> >>>> In general `CoGroup` is for the window based operation. How it could >>>> satisfy your requirements depends on your specific scenario. But if you >>>> want to look at the mysql table as a dimension table. There might be other >>>> two ways: >>>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the >>>> JDBC table as a dimension table) in the table jdbc connector [1] and more >>>> join information in the [2] >>>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` >>>> function could satisfy your requirements. You could find the example in >>>> [3]. >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html >>>> [3] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.ras...@gmail.com> wrote: >>>> >>>>> Hi i want to do join reference between kafka with mysql table >>>>> reference. how can i do this thing with flink stream. does coGroup >>>>> function >>>>> can handle this ? or anyone have java sample code with this case? i’ve >>>>> read >>>>> some article that said if cogroup function can do left outer join. but i’m >>>>> still confuse to implement it because i just learned flink stream. >>>>> >>>>> >>>>> need advice pls. >>>>> >>>> >>>> >>>