Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same
joining key right?

2) Can I use state processor API
<https://flink.apache.org/feature/2019/09/13/state-processor-api.html> from
an external application to query the intermediate results in near
real-time? I thought querying rocksdb state is a widely requested feature.
It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which are
> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>     It is a 2-level map structure, where the `col1` is the join key, it is
> the first-level key of the state. The key of the MapState is the input row,
>     and the `count` is the number of this row, the expiredTime indicates
> when to cleanup this row (avoid infinite state size). You can find the
> source code here[1].
>     In blink planner, the state structure will be more complex which is
> determined by the meta-information of upstream. You can see the source code
> of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
>     Query on the intermediate state is on the roadmap, but I guess it is
> not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml
> says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the key
> and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>

Reply via email to