Hi, sunfulin
I reproduce your case,this should be a bug in extracting unique key from plan 
and I create an issue[1] to trace this.


CC:  jark


[1]https://issues.apache.org/jira/browse/FLINK-16070 
<https://issues.apache.org/jira/browse/FLINK-16070>



> 在 2020年2月14日,23:39,sunfulin <sunfulin0...@163.com> 写道:
> 
> Hi, Jark
> Appreciate for your reply. insert with column list indeed is not allowed with 
> old planner enabled in Flink 1.10 while it will throws exception such as 
> "Partial insert is not supported". 
> Never mind for this issue. Focus on the UpsertMode exception, my es DDL is 
> like the following: 
> 
> CREATE TABLE ES6_ZHANGLE_OUTPUT (
>   aggId varchar ,
>   pageId varchar ,
>   ts varchar ,
>   expoCnt bigint ,
>   clkCnt bigint
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093',
> 'connector.index' = 'flink_zhangle_pageview',
> 'connector.document-type' = '_doc',
> 'update-mode' = 'upsert',
> 'connector.key-delimiter' = '$',
> 'connector.key-null-literal' = 'n/a',
> 'connector.bulk-flush.interval' = '1000',
> 'format.type' = 'json'
> )
> 
> 
> And the SQL logic is as the following:
> 
> INSERT INTO ES6_ZHANGLE_OUTPUT
>   SELECT aggId, pageId, ts_min as ts,
>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>   count(case when eventId = 'click' then 1 else null end) as clkCnt
>   FROM
>   (
>     SELECT
>         'ZL_001' as aggId,
>         pageId,
>         eventId,
>         recvTime,
>         ts2Date(recvTime) as ts_min
>     from kafka_zl_etrack_event_stream
>     where eventId in ('exposure', 'click')
>   ) as t1
>   group by aggId, pageId, ts_min
> 
> I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
> execute the task. Not sure what the root cause is. 
> 
> 
> At 2020-02-14 23:19:14, "Jark Wu" <imj...@gmail.com> wrote:
> 
> Hi sunfulin,
> 
> Is this the real query you submit?  AFAIK, insert with column list is not 
> allowed for now, 
> i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`.
> 
> Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT 
> table and kafka_zl_etrack_event_stream table.
> If you have a minimal program that can reproduce this problem, that would be 
> great. 
> 
> Best,
> Jark
> 
> On Fri, 14 Feb 2020 at 22:53, Robert Metzger <rmetz...@apache.org 
> <mailto:rmetz...@apache.org>> wrote:
> 
> 
> ---------- Forwarded message ---------
> From: sunfulin <sunfulin0...@163.com <mailto:sunfulin0...@163.com>>
> Date: Fri, Feb 14, 2020 at 2:59 AM
> Subject: Re:Flink 1.10 es sink exception
> To: user@flink.apache.org <mailto:user@flink.apache.org> 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> 
> 
> Anyone can share a little advice on the reason of this exception? I changed 
> to use old planner, the same sql runs well. 
> 
> 
> 
> 
> 
> At 2020-02-13 16:07:18, "sunfulin" <sunfulin0...@163.com 
> <mailto:sunfulin0...@163.com>> wrote:
> 
> Hi, guys
> When running the same Flink sql like the following, I met exception like 
> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
> that Table has a full primary keys if it is updated". I am using the latest 
> Flink 1.10 release with blink planner enabled. Because the same logic runs 
> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or 
> may has a bug here ? 
> 
> 
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>   SELECT aggId, pageId, ts_min as ts,
>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>   count(case when eventId = 'click' then 1 else null end) as clickCnt
>   FROM
>   (
>     SELECT
>         'ZL_001' as aggId,
>         pageId,
>         eventId,
>         recvTime,
>         ts2Date(recvTime) as ts_min
>     from kafka_zl_etrack_event_stream
>     where eventId in ('exposure', 'click')
>   ) as t1
>   group by aggId, pageId, ts_min
> 
> 
> 
>  
> 
> 
>  
> 
> 
>  

Reply via email to