Hi, sunfulin

Using constant key in `group by` query is not usual and inefficient, you  can 
get around this bug by bubbling up your constant key in `group by` from now.  

BTW,godfrey is ready to resolve issue. 


> 在 2020年2月17日,10:15,sunfulin <sunfulin0...@163.com> 写道:
> 
> Hi,
> WOW,really thankful for the track and debug of this problem. I can see the 
> constant key issue. Appreciate for your kindly help : )
> 
> 
> 
> 
> 
> At 2020-02-15 21:06:58, "Leonard Xu" <xbjt...@gmail.com> wrote:
> 
> 
> Hi, sunfulin
> I reproduce your case,this should be a bug in extracting unique key from plan 
> and I create an issue[1] to trace this.
> 
> 
> CC:  jark
> 
> 
> [1]https://issues.apache.org/jira/browse/FLINK-16070 
> <https://issues.apache.org/jira/browse/FLINK-16070>
> 
> 
> 
>> 在 2020年2月14日,23:39,sunfulin <sunfulin0...@163.com 
>> <mailto:sunfulin0...@163.com>> 写道:
>> 
>> Hi, Jark
>> Appreciate for your reply. insert with column list indeed is not allowed 
>> with old planner enabled in Flink 1.10 while it will throws exception such 
>> as "Partial insert is not supported". 
>> Never mind for this issue. Focus on the UpsertMode exception, my es DDL is 
>> like the following: 
>> 
>> CREATE TABLE ES6_ZHANGLE_OUTPUT (
>>   aggId varchar ,
>>   pageId varchar ,
>>   ts varchar ,
>>   expoCnt bigint ,
>>   clkCnt bigint
>> ) WITH (
>> 'connector.type' = 'elasticsearch',
>> 'connector.version' = '6',
>> 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093 
>> <http://168.61.113.171:9092;http://168.61.113.171:9093>',
>> 'connector.index' = 'flink_zhangle_pageview',
>> 'connector.document-type' = '_doc',
>> 'update-mode' = 'upsert',
>> 'connector.key-delimiter' = '$',
>> 'connector.key-null-literal' = 'n/a',
>> 'connector.bulk-flush.interval' = '1000',
>> 'format.type' = 'json'
>> )
>> 
>> 
>> And the SQL logic is as the following:
>> 
>> INSERT INTO ES6_ZHANGLE_OUTPUT
>>   SELECT aggId, pageId, ts_min as ts,
>>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>>   count(case when eventId = 'click' then 1 else null end) as clkCnt
>>   FROM
>>   (
>>     SELECT
>>         'ZL_001' as aggId,
>>         pageId,
>>         eventId,
>>         recvTime,
>>         ts2Date(recvTime) as ts_min
>>     from kafka_zl_etrack_event_stream
>>     where eventId in ('exposure', 'click')
>>   ) as t1
>>   group by aggId, pageId, ts_min
>> 
>> I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
>> execute the task. Not sure what the root cause is. 
>> 
>> 
>> At 2020-02-14 23:19:14, "Jark Wu" <imj...@gmail.com 
>> <mailto:imj...@gmail.com>> wrote:
>> 
>> Hi sunfulin,
>> 
>> Is this the real query you submit?  AFAIK, insert with column list is not 
>> allowed for now, 
>> i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, 
>> clkCnt)`.
>> 
>> Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT 
>> table and kafka_zl_etrack_event_stream table.
>> If you have a minimal program that can reproduce this problem, that would be 
>> great. 
>> 
>> Best,
>> Jark
>> 
>> On Fri, 14 Feb 2020 at 22:53, Robert Metzger <rmetz...@apache.org 
>> <mailto:rmetz...@apache.org>> wrote:
>> 
>> 
>> ---------- Forwarded message ---------
>> From: sunfulin <sunfulin0...@163.com <mailto:sunfulin0...@163.com>>
>> Date: Fri, Feb 14, 2020 at 2:59 AM
>> Subject: Re:Flink 1.10 es sink exception
>> To: user@flink.apache.org <mailto:user@flink.apache.org> 
>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>> 
>> 
>> Anyone can share a little advice on the reason of this exception? I changed 
>> to use old planner, the same sql runs well. 
>> 
>> 
>> 
>> 
>> 
>> At 2020-02-13 16:07:18, "sunfulin" <sunfulin0...@163.com 
>> <mailto:sunfulin0...@163.com>> wrote:
>> 
>> Hi, guys
>> When running the same Flink sql like the following, I met exception like 
>> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
>> that Table has a full primary keys if it is updated". I am using the latest 
>> Flink 1.10 release with blink planner enabled. Because the same logic runs 
>> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or 
>> may has a bug here ? 
>> 
>> 
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>   SELECT aggId, pageId, ts_min as ts,
>>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>>   count(case when eventId = 'click' then 1 else null end) as clickCnt
>>   FROM
>>   (
>>     SELECT
>>         'ZL_001' as aggId,
>>         pageId,
>>         eventId,
>>         recvTime,
>>         ts2Date(recvTime) as ts_min
>>     from kafka_zl_etrack_event_stream
>>     where eventId in ('exposure', 'click')
>>   ) as t1
>>   group by aggId, pageId, ts_min
>> 
>> 
>> 
>>  
>> 
>> 
>>  
>> 
>> 
>>  
> 
> 
> 
>  

Reply via email to