Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?
Can I do Elasticseatch as a state backend? Thanks! On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote: > Hi Kant, > > 1) List of row is also sufficient in this case. Using a MapState is in > order to retract a row faster, and save the storage size. > > 2) State Process API is usually used to process save point. I’m afraid the > performance is not good to use it for querying. > On the other side, AFAIK, State Process API requires the uid of > operator. However, uid of operators is not set in Table API & SQL. > So I’m not sure whether it works or not. > > 3)You can have a custom statebackend by > implement org.apache.flink.runtime.state.StateBackend interface, and use it > via `env.setStateBackend(…)`. > > Best, > Jark > > On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> wrote: > >> Hi Jark, >> >> 1) shouldn't it be a col1 to List of row? multiple rows can have the same >> joining key right? >> >> 2) Can I use state processor API >> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html> >> from an external application to query the intermediate results in near >> real-time? I thought querying rocksdb state is a widely requested feature. >> It would be really great to consider this feature for 1.11 >> >> 3) Is there any interface where I can implement my own state backend? >> >> Thanks! >> >> >> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Kant, >>> >>> 1) Yes, it will be stored in rocksdb statebackend. >>> 2) In old planner, the left state is the same with right state which are >>> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`. >>> It is a 2-level map structure, where the `col1` is the join key, it >>> is the first-level key of the state. The key of the MapState is the input >>> row, >>> and the `count` is the number of this row, the expiredTime indicates >>> when to cleanup this row (avoid infinite state size). You can find the >>> source code here[1]. >>> In blink planner, the state structure will be more complex which is >>> determined by the meta-information of upstream. You can see the source code >>> of blink planner here [2]. >>> 3) Currently, the intermediate state is not exposed to users. Usually, >>> users should write the query result to an external system (like Mysql) and >>> query the external system. >>> Query on the intermediate state is on the roadmap, but I guess it is >>> not in 1.11 plan. >>> >>> Best, >>> Jark >>> >>> [1]: >>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 >>> [2]: >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 >>> >>> >>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道: >>> >>> Hi All, >>> >>> If I run a query like this >>> >>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on >>> table1.col1 = table2.col1") >>> >>> 1) Where will flink store the intermediate result? Imagine >>> flink-conf.yaml says state.backend = 'rocksdb' >>> >>> 2) If the intermediate results are stored in rockdb then what is the key >>> and value in this case(given the query above)? >>> >>> 3) What is the best way to query these intermediate results from an >>> external application? while the job is running and while the job is not >>> running? >>> >>> Thanks! >>> >>> >>>