Hi, sunfulin I reproduce your case,this should be a bug in extracting unique key from plan and I create an issue[1] to trace this.
CC: jark [1]https://issues.apache.org/jira/browse/FLINK-16070 <https://issues.apache.org/jira/browse/FLINK-16070> > 在 2020年2月14日,23:39,sunfulin <sunfulin0...@163.com> 写道: > > Hi, Jark > Appreciate for your reply. insert with column list indeed is not allowed with > old planner enabled in Flink 1.10 while it will throws exception such as > "Partial insert is not supported". > Never mind for this issue. Focus on the UpsertMode exception, my es DDL is > like the following: > > CREATE TABLE ES6_ZHANGLE_OUTPUT ( > aggId varchar , > pageId varchar , > ts varchar , > expoCnt bigint , > clkCnt bigint > ) WITH ( > 'connector.type' = 'elasticsearch', > 'connector.version' = '6', > 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093', > 'connector.index' = 'flink_zhangle_pageview', > 'connector.document-type' = '_doc', > 'update-mode' = 'upsert', > 'connector.key-delimiter' = '$', > 'connector.key-null-literal' = 'n/a', > 'connector.bulk-flush.interval' = '1000', > 'format.type' = 'json' > ) > > > And the SQL logic is as the following: > > INSERT INTO ES6_ZHANGLE_OUTPUT > SELECT aggId, pageId, ts_min as ts, > count(case when eventId = 'exposure' then 1 else null end) as expoCnt, > count(case when eventId = 'click' then 1 else null end) as clkCnt > FROM > ( > SELECT > 'ZL_001' as aggId, > pageId, > eventId, > recvTime, > ts2Date(recvTime) as ts_min > from kafka_zl_etrack_event_stream > where eventId in ('exposure', 'click') > ) as t1 > group by aggId, pageId, ts_min > > I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and > execute the task. Not sure what the root cause is. > > > At 2020-02-14 23:19:14, "Jark Wu" <imj...@gmail.com> wrote: > > Hi sunfulin, > > Is this the real query you submit? AFAIK, insert with column list is not > allowed for now, > i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`. > > Could you attach the full SQL text, including DDLs of ES6_ZHANGLE_OUTPUT > table and kafka_zl_etrack_event_stream table. > If you have a minimal program that can reproduce this problem, that would be > great. > > Best, > Jark > > On Fri, 14 Feb 2020 at 22:53, Robert Metzger <rmetz...@apache.org > <mailto:rmetz...@apache.org>> wrote: > > > ---------- Forwarded message --------- > From: sunfulin <sunfulin0...@163.com <mailto:sunfulin0...@163.com>> > Date: Fri, Feb 14, 2020 at 2:59 AM > Subject: Re:Flink 1.10 es sink exception > To: user@flink.apache.org <mailto:user@flink.apache.org> > <user@flink.apache.org <mailto:user@flink.apache.org>> > > > Anyone can share a little advice on the reason of this exception? I changed > to use old planner, the same sql runs well. > > > > > > At 2020-02-13 16:07:18, "sunfulin" <sunfulin0...@163.com > <mailto:sunfulin0...@163.com>> wrote: > > Hi, guys > When running the same Flink sql like the following, I met exception like > "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that Table has a full primary keys if it is updated". I am using the latest > Flink 1.10 release with blink planner enabled. Because the same logic runs > well within Flink 1.8.2 old planner. Does the SQL usage has some problem or > may has a bug here ? > > > INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt) > SELECT aggId, pageId, ts_min as ts, > count(case when eventId = 'exposure' then 1 else null end) as expoCnt, > count(case when eventId = 'click' then 1 else null end) as clickCnt > FROM > ( > SELECT > 'ZL_001' as aggId, > pageId, > eventId, > recvTime, > ts2Date(recvTime) as ts_min > from kafka_zl_etrack_event_stream > where eventId in ('exposure', 'click') > ) as t1 > group by aggId, pageId, ts_min > > > > > > > > > >