Blink planner support lazy translation for multiple SQLs, and the common nodes will be reused in a single job. The only thing you need note here is the unified TableEnvironmentImpl do not support conversions between Table(s) and Stream(s). U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).
*Best Regards,* *Zhenghua Gao* On Fri, Aug 9, 2019 at 12:38 PM Tony Wei <tony19920...@gmail.com> wrote: > forgot to send to user mailing list. > > Tony Wei <tony19920...@gmail.com> 於 2019年8月9日 週五 下午12:36寫道: > >> Hi Zhenghua, >> >> I didn't get your point. It seems that `isEagerOperationTranslation` is >> always return false. Is that >> means even I used Blink planner, the sql translation is still in a lazy >> manner? >> >> Or do you mean Blink planner will recognize and link two SQLs to the same >> kafka source, if >> they both use the same kafka table, even if the translation is lazy? >> >> I'm not familiar with the details of translation process, but I guessed >> the translating eagerly is not >> be an only solution. If the translation of the second SQL can reuse the >> operators from the first SQL, >> then it is possible to link them to the same kafka source operator. >> >> Best, >> Tony Wei >> >> Zhenghua Gao <doc...@gmail.com> 於 2019年8月9日 週五 上午11:57寫道: >> >>> This needs EagerOperationTranslation[1] >>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413> >>> support. you can try in Blink planner in 1.9.0. >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413 >>> >>> *Best Regards,* >>> *Zhenghua Gao* >>> >>> >>> On Fri, Aug 9, 2019 at 10:37 AM Tony Wei <tony19920...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` >>>> to register my kafka table. >>>> However, I found that because SQL is a lazy operation, it will convert >>>> to DataStream under some >>>> criteria. For example, `Table#toRetractStream`. >>>> >>>> So, when I used two SQLs in one application job, the same kafka table >>>> will be constructed twice. It >>>> is not a problem from flink side, because two operators held their own >>>> state for offsets. But from >>>> kafka side, they will have the same group_id. >>>> >>>> I want to make sure that only one kafka source will commit group_id's >>>> offsets back to kafka. A >>>> workaround might be registering the same kafka topic twice with >>>> different name, group_id for >>>> two SQLs. But I would still like to know if there is any way to make >>>> two SQLs just read from the >>>> same KafkaTableSource? Thanks in advance. >>>> >>>> Best, >>>> Tony Wei >>>> >>>