Hi, Jark Appreciate for your reply. insert with column list indeed is not allowed with old planner enabled in Flink 1.10 while it will throws exception such as "Partial insert is not supported". Never mind for this issue. Focus on the UpsertMode exception, my es DDL is like the following:
CREATE TABLE ES6_ZHANGLE_OUTPUT ( aggId varchar , pageId varchar , ts varchar , expoCnt bigint , clkCnt bigint ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093', 'connector.index' = 'flink_zhangle_pageview', 'connector.document-type' = '_doc', 'update-mode' = 'upsert', 'connector.key-delimiter' = '$', 'connector.key-null-literal' = 'n/a', 'connector.bulk-flush.interval' = '1000', 'format.type' = 'json' ) And the SQL logic is as the following: INSERT INTO ES6_ZHANGLE_OUTPUT SELECT aggId, pageId, ts_min as ts, count(case when eventId = 'exposure' then 1 else null end) as expoCnt, count(case when eventId = 'click' then 1 else null end) as clkCnt FROM ( SELECT 'ZL_001' as aggId, pageId, eventId, recvTime, ts2Date(recvTime) as ts_min from kafka_zl_etrack_event_stream where eventId in ('exposure', 'click') ) as t1 group by aggId, pageId, ts_min I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and execute the task. Not sure what the root cause is. At 2020-02-14 23:19:14, "Jark Wu" <imj...@gmail.com> wrote: Hi sunfulin, Is this the real query you submit? AFAIK, insert with column list is not allowed for now, i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`. Could you attach the full SQL text, including DDLs of ES6_ZHANGLE_OUTPUT table and kafka_zl_etrack_event_stream table. If you have a minimal program that can reproduce this problem, that would be great. Best, Jark On Fri, 14 Feb 2020 at 22:53, Robert Metzger <rmetz...@apache.org> wrote: ---------- Forwarded message --------- From: sunfulin<sunfulin0...@163.com> Date: Fri, Feb 14, 2020 at 2:59 AM Subject: Re:Flink 1.10 es sink exception To: user@flink.apache.org <user@flink.apache.org> Anyone can share a little advice on the reason of this exception? I changed to use old planner, the same sql runs well. At 2020-02-13 16:07:18, "sunfulin" <sunfulin0...@163.com> wrote: Hi, guys When running the same Flink sql like the following, I met exception like "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated". I am using the latest Flink 1.10 release with blink planner enabled. Because the same logic runs well within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has a bug here ? INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt) SELECT aggId, pageId, ts_min as ts, count(case when eventId = 'exposure' then 1 else null end) as expoCnt, count(case when eventId = 'click' then 1 else null end) as clickCnt FROM ( SELECT 'ZL_001' as aggId, pageId, eventId, recvTime, ts2Date(recvTime) as ts_min from kafka_zl_etrack_event_stream where eventId in ('exposure', 'click') ) as t1 group by aggId, pageId, ts_min