Re: Question about runtime filter

2020-03-04 Thread Jingsong Li
Great exploration. And thanks for your information. I believe you have a deep understanding of Flink's internal mechanism. Best, Jingsong Lee On Thu, Mar 5, 2020 at 12:09 PM faaron zheng wrote: > I finally got through the runtimefilter in 1.10, the reason why it didn't > call commit method is i

Re: Question about runtime filter

2020-03-04 Thread faaron zheng
I finally got through the runtimefilter in 1.10, the reason why it didn't call commit method is in OperatorCodeGenerator. It should call endInput() method correctly in generateOneInputStreamOperator. A complete process of runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side c

Re: Question about runtime filter

2020-03-02 Thread faaron zheng
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much longer time but get same result. I think the reason is not commit preAggregateAccumulator. But I dont know why it happens? JingsongLee 于 2020年3月2日周一 下午3:22写道: > Hi, > > Does runtime filter probe side wait for building runtime

Re: Question about runtime filter

2020-03-02 Thread faaron zheng
Thanks for replying Lee, I follow your method to debug the code and I find the build side only call addPreAggregatedAccumulator but not call commit method. Furthermore, I add a breakpoint at future.handleAsync in asyncGetBroadcastBloomFilter method. But when program stop at if(e==null && accumulat

Re: Question about runtime filter

2020-03-01 Thread JingsongLee
Hi, Does runtime filter probe side wait for building runtime filter? Can you check the start time of build side and probe side? Best, Jingsong Lee -- From:faaron zheng Send Time:2020年3月2日(星期一) 14:55 To:user Subject:Question abou

Question about runtime filter

2020-03-01 Thread faaron zheng
Hi, everyone These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator methods according to accumulator. Now,