Hi Kant, 1) Yes, it will be stored in rocksdb statebackend. 2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`. It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1]. In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2]. 3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan.
Best, Jark [1]: http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 <http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61> [2]: https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45> > 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道: > > Hi All, > > If I run a query like this > > StreamTableEnvironment.sqlQuery("select * from table1 join table2 on > table1.col1 = table2.col1") > > 1) Where will flink store the intermediate result? Imagine flink-conf.yaml > says state.backend = 'rocksdb' > > 2) If the intermediate results are stored in rockdb then what is the key and > value in this case(given the query above)? > > 3) What is the best way to query these intermediate results from an external > application? while the job is running and while the job is not running? > > Thanks!