Hi Kant, just wanted to mention the obvious. If you add a ProcessFunction right after the join, you could maintain a user state with the same result. That will of course blow up the data volume by a factor of 2, but may still be better than writing to an external system.
On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris < benoit.pa...@centraliens-lille.org> wrote: > Dang what a massive PR: Files changed2,118, +104,104 −29,161 lines > changed. > Thanks for the details, Jark! > > On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Kant, >> Having a custom state backend is very difficult and is not recommended. >> >> Hi Benoît, >> Yes, the "Query on the intermediate state is on the roadmap" I >> mentioned is referring to integrate Table API & SQL with Queryable State. >> We also have an early issue FLINK-6968 to tracks this. >> >> Best, >> Jark >> >> >> On Fri, 24 Jan 2020 at 00:26, Benoît Paris < >> benoit.pa...@centraliens-lille.org> wrote: >> >>> Hi all! >>> >>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query >>> on the intermediate state is on the roadmap"? >>> Are you referring to working on >>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME >>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)? >>> >>> Cheers >>> Ben >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table >>> >>> >>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth...@gmail.com> wrote: >>> >>>> Is it a common practice to have a custom state backend? if so, what >>>> would be a popular custom backend? >>>> >>>> Can I do Elasticseatch as a state backend? >>>> >>>> Thanks! >>>> >>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote: >>>> >>>>> Hi Kant, >>>>> >>>>> 1) List of row is also sufficient in this case. Using a MapState is in >>>>> order to retract a row faster, and save the storage size. >>>>> >>>>> 2) State Process API is usually used to process save point. I’m afraid >>>>> the performance is not good to use it for querying. >>>>> On the other side, AFAIK, State Process API requires the uid of >>>>> operator. However, uid of operators is not set in Table API & SQL. >>>>> So I’m not sure whether it works or not. >>>>> >>>>> 3)You can have a custom statebackend by >>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use >>>>> it >>>>> via `env.setStateBackend(…)`. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> wrote: >>>>> >>>>>> Hi Jark, >>>>>> >>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the >>>>>> same joining key right? >>>>>> >>>>>> 2) Can I use state processor API >>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html> >>>>>> from an external application to query the intermediate results in near >>>>>> real-time? I thought querying rocksdb state is a widely requested >>>>>> feature. >>>>>> It would be really great to consider this feature for 1.11 >>>>>> >>>>>> 3) Is there any interface where I can implement my own state backend? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote: >>>>>> >>>>>>> Hi Kant, >>>>>>> >>>>>>> 1) Yes, it will be stored in rocksdb statebackend. >>>>>>> 2) In old planner, the left state is the same with right state which >>>>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`. >>>>>>> It is a 2-level map structure, where the `col1` is the join key, >>>>>>> it is the first-level key of the state. The key of the MapState is the >>>>>>> input row, >>>>>>> and the `count` is the number of this row, the expiredTime >>>>>>> indicates when to cleanup this row (avoid infinite state size). You can >>>>>>> find the source code here[1]. >>>>>>> In blink planner, the state structure will be more complex which >>>>>>> is determined by the meta-information of upstream. You can see the >>>>>>> source >>>>>>> code of blink planner here [2]. >>>>>>> 3) Currently, the intermediate state is not exposed to users. >>>>>>> Usually, users should write the query result to an external system (like >>>>>>> Mysql) and query the external system. >>>>>>> Query on the intermediate state is on the roadmap, but I guess >>>>>>> it is not in 1.11 plan. >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> [1]: >>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 >>>>>>> [2]: >>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 >>>>>>> >>>>>>> >>>>>>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> If I run a query like this >>>>>>> >>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on >>>>>>> table1.col1 = table2.col1") >>>>>>> >>>>>>> 1) Where will flink store the intermediate result? Imagine >>>>>>> flink-conf.yaml says state.backend = 'rocksdb' >>>>>>> >>>>>>> 2) If the intermediate results are stored in rockdb then what is the >>>>>>> key and value in this case(given the query above)? >>>>>>> >>>>>>> 3) What is the best way to query these intermediate results from an >>>>>>> external application? while the job is running and while the job is not >>>>>>> running? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> >>> >>> -- >>> Benoît Paris >>> Ingénieur Machine Learning Explicable >>> Tél : +33 6 60 74 23 00 >>> http://benoit.paris >>> http://explicable.ml >>> >> > > -- > Benoît Paris > Ingénieur Machine Learning Explicable > Tél : +33 6 60 74 23 00 > http://benoit.paris > http://explicable.ml >