Hi, sunfulin Using constant key in `group by` query is not usual and inefficient, you can get around this bug by bubbling up your constant key in `group by` from now.
BTW,godfrey is ready to resolve issue. > 在 2020年2月17日,10:15,sunfulin <sunfulin0...@163.com> 写道: > > Hi, > WOW,really thankful for the track and debug of this problem. I can see the > constant key issue. Appreciate for your kindly help : ) > > > > > > At 2020-02-15 21:06:58, "Leonard Xu" <xbjt...@gmail.com> wrote: > > > Hi, sunfulin > I reproduce your case,this should be a bug in extracting unique key from plan > and I create an issue[1] to trace this. > > > CC: jark > > > [1]https://issues.apache.org/jira/browse/FLINK-16070 > <https://issues.apache.org/jira/browse/FLINK-16070> > > > >> 在 2020年2月14日,23:39,sunfulin <sunfulin0...@163.com >> <mailto:sunfulin0...@163.com>> 写道: >> >> Hi, Jark >> Appreciate for your reply. insert with column list indeed is not allowed >> with old planner enabled in Flink 1.10 while it will throws exception such >> as "Partial insert is not supported". >> Never mind for this issue. Focus on the UpsertMode exception, my es DDL is >> like the following: >> >> CREATE TABLE ES6_ZHANGLE_OUTPUT ( >> aggId varchar , >> pageId varchar , >> ts varchar , >> expoCnt bigint , >> clkCnt bigint >> ) WITH ( >> 'connector.type' = 'elasticsearch', >> 'connector.version' = '6', >> 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093 >> <http://168.61.113.171:9092;http://168.61.113.171:9093>', >> 'connector.index' = 'flink_zhangle_pageview', >> 'connector.document-type' = '_doc', >> 'update-mode' = 'upsert', >> 'connector.key-delimiter' = '$', >> 'connector.key-null-literal' = 'n/a', >> 'connector.bulk-flush.interval' = '1000', >> 'format.type' = 'json' >> ) >> >> >> And the SQL logic is as the following: >> >> INSERT INTO ES6_ZHANGLE_OUTPUT >> SELECT aggId, pageId, ts_min as ts, >> count(case when eventId = 'exposure' then 1 else null end) as expoCnt, >> count(case when eventId = 'click' then 1 else null end) as clkCnt >> FROM >> ( >> SELECT >> 'ZL_001' as aggId, >> pageId, >> eventId, >> recvTime, >> ts2Date(recvTime) as ts_min >> from kafka_zl_etrack_event_stream >> where eventId in ('exposure', 'click') >> ) as t1 >> group by aggId, pageId, ts_min >> >> I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and >> execute the task. Not sure what the root cause is. >> >> >> At 2020-02-14 23:19:14, "Jark Wu" <imj...@gmail.com >> <mailto:imj...@gmail.com>> wrote: >> >> Hi sunfulin, >> >> Is this the real query you submit? AFAIK, insert with column list is not >> allowed for now, >> i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, >> clkCnt)`. >> >> Could you attach the full SQL text, including DDLs of ES6_ZHANGLE_OUTPUT >> table and kafka_zl_etrack_event_stream table. >> If you have a minimal program that can reproduce this problem, that would be >> great. >> >> Best, >> Jark >> >> On Fri, 14 Feb 2020 at 22:53, Robert Metzger <rmetz...@apache.org >> <mailto:rmetz...@apache.org>> wrote: >> >> >> ---------- Forwarded message --------- >> From: sunfulin <sunfulin0...@163.com <mailto:sunfulin0...@163.com>> >> Date: Fri, Feb 14, 2020 at 2:59 AM >> Subject: Re:Flink 1.10 es sink exception >> To: user@flink.apache.org <mailto:user@flink.apache.org> >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> >> >> Anyone can share a little advice on the reason of this exception? I changed >> to use old planner, the same sql runs well. >> >> >> >> >> >> At 2020-02-13 16:07:18, "sunfulin" <sunfulin0...@163.com >> <mailto:sunfulin0...@163.com>> wrote: >> >> Hi, guys >> When running the same Flink sql like the following, I met exception like >> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires >> that Table has a full primary keys if it is updated". I am using the latest >> Flink 1.10 release with blink planner enabled. Because the same logic runs >> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or >> may has a bug here ? >> >> >> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt) >> SELECT aggId, pageId, ts_min as ts, >> count(case when eventId = 'exposure' then 1 else null end) as expoCnt, >> count(case when eventId = 'click' then 1 else null end) as clickCnt >> FROM >> ( >> SELECT >> 'ZL_001' as aggId, >> pageId, >> eventId, >> recvTime, >> ts2Date(recvTime) as ts_min >> from kafka_zl_etrack_event_stream >> where eventId in ('exposure', 'click') >> ) as t1 >> group by aggId, pageId, ts_min >> >> >> >> >> >> >> >> >> >> > > > >