Hi Kant, 1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.
2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL. So I’m not sure whether it works or not. 3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`. Best, Jark On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> wrote: > Hi Jark, > > 1) shouldn't it be a col1 to List of row? multiple rows can have the same > joining key right? > > 2) Can I use state processor API > <https://flink.apache.org/feature/2019/09/13/state-processor-api.html> > from an external application to query the intermediate results in near > real-time? I thought querying rocksdb state is a widely requested feature. > It would be really great to consider this feature for 1.11 > > 3) Is there any interface where I can implement my own state backend? > > Thanks! > > > On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Kant, >> >> 1) Yes, it will be stored in rocksdb statebackend. >> 2) In old planner, the left state is the same with right state which are >> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`. >> It is a 2-level map structure, where the `col1` is the join key, it >> is the first-level key of the state. The key of the MapState is the input >> row, >> and the `count` is the number of this row, the expiredTime indicates >> when to cleanup this row (avoid infinite state size). You can find the >> source code here[1]. >> In blink planner, the state structure will be more complex which is >> determined by the meta-information of upstream. You can see the source code >> of blink planner here [2]. >> 3) Currently, the intermediate state is not exposed to users. Usually, >> users should write the query result to an external system (like Mysql) and >> query the external system. >> Query on the intermediate state is on the roadmap, but I guess it is >> not in 1.11 plan. >> >> Best, >> Jark >> >> [1]: >> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 >> [2]: >> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 >> >> >> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道: >> >> Hi All, >> >> If I run a query like this >> >> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on >> table1.col1 = table2.col1") >> >> 1) Where will flink store the intermediate result? Imagine >> flink-conf.yaml says state.backend = 'rocksdb' >> >> 2) If the intermediate results are stored in rockdb then what is the key >> and value in this case(given the query above)? >> >> 3) What is the best way to query these intermediate results from an >> external application? while the job is running and while the job is not >> running? >> >> Thanks! >> >> >>