Yes, the default is writing to an external system. Especially if you want SQL, then there is currently no other way around it.
The drawbacks of writing to external systems are: additional maintenance of another system and higher latency. On Tue, Jan 28, 2020 at 11:49 AM kant kodali <kanth...@gmail.com> wrote: > Hi Arvid, > > I am trying to understand your statement. I am new to Flink so excuse me > if I don't know something I should have known. ProcessFunction just process > the records right? If so, how is it better than writing to an external > system? At the end of the day I want to be able to query it (doesn't have > to be through Queryable state and actually I probably don't want to use > Queryable state for its limitations). But ideally I want to be able to > query the intermediate states using SQL and hopefully, the store that is > maintaining the intermediate state has some sort of index support so the > read queries are faster than doing the full scan. > > Also, I hear Querying intermediate state just like one would in a database > is a widely requested feature so its a bit surprising that this is not > solved just yet but I am hopeful! > > Thanks! > > > > On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Kant, >> >> just wanted to mention the obvious. If you add a ProcessFunction right >> after the join, you could maintain a user state with the same result. That >> will of course blow up the data volume by a factor of 2, but may still be >> better than writing to an external system. >> >> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris < >> benoit.pa...@centraliens-lille.org> wrote: >> >>> Dang what a massive PR: Files changed2,118, +104,104 −29,161 lines >>> changed. >>> Thanks for the details, Jark! >>> >>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi Kant, >>>> Having a custom state backend is very difficult and is not recommended. >>>> >>>> Hi Benoît, >>>> Yes, the "Query on the intermediate state is on the roadmap" I >>>> mentioned is referring to integrate Table API & SQL with Queryable State. >>>> We also have an early issue FLINK-6968 to tracks this. >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris < >>>> benoit.pa...@centraliens-lille.org> wrote: >>>> >>>>> Hi all! >>>>> >>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query >>>>> on the intermediate state is on the roadmap"? >>>>> Are you referring to working on >>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME >>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)? >>>>> >>>>> Cheers >>>>> Ben >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table >>>>> >>>>> >>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Is it a common practice to have a custom state backend? if so, what >>>>>> would be a popular custom backend? >>>>>> >>>>>> Can I do Elasticseatch as a state backend? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote: >>>>>> >>>>>>> Hi Kant, >>>>>>> >>>>>>> 1) List of row is also sufficient in this case. Using a MapState is >>>>>>> in order to retract a row faster, and save the storage size. >>>>>>> >>>>>>> 2) State Process API is usually used to process save point. I’m >>>>>>> afraid the performance is not good to use it for querying. >>>>>>> On the other side, AFAIK, State Process API requires the uid of >>>>>>> operator. However, uid of operators is not set in Table API & SQL. >>>>>>> So I’m not sure whether it works or not. >>>>>>> >>>>>>> 3)You can have a custom statebackend by >>>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and >>>>>>> use it >>>>>>> via `env.setStateBackend(…)`. >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jark, >>>>>>>> >>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have >>>>>>>> the same joining key right? >>>>>>>> >>>>>>>> 2) Can I use state processor API >>>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html> >>>>>>>> from an external application to query the intermediate results in near >>>>>>>> real-time? I thought querying rocksdb state is a widely requested >>>>>>>> feature. >>>>>>>> It would be really great to consider this feature for 1.11 >>>>>>>> >>>>>>>> 3) Is there any interface where I can implement my own state >>>>>>>> backend? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Kant, >>>>>>>>> >>>>>>>>> 1) Yes, it will be stored in rocksdb statebackend. >>>>>>>>> 2) In old planner, the left state is the same with right state >>>>>>>>> which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`. >>>>>>>>> It is a 2-level map structure, where the `col1` is the join >>>>>>>>> key, it is the first-level key of the state. The key of the MapState >>>>>>>>> is the >>>>>>>>> input row, >>>>>>>>> and the `count` is the number of this row, the expiredTime >>>>>>>>> indicates when to cleanup this row (avoid infinite state size). You >>>>>>>>> can >>>>>>>>> find the source code here[1]. >>>>>>>>> In blink planner, the state structure will be more complex >>>>>>>>> which is determined by the meta-information of upstream. You can see >>>>>>>>> the >>>>>>>>> source code of blink planner here [2]. >>>>>>>>> 3) Currently, the intermediate state is not exposed to users. >>>>>>>>> Usually, users should write the query result to an external system >>>>>>>>> (like >>>>>>>>> Mysql) and query the external system. >>>>>>>>> Query on the intermediate state is on the roadmap, but I guess >>>>>>>>> it is not in 1.11 plan. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> [1]: >>>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 >>>>>>>>> [2]: >>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 >>>>>>>>> >>>>>>>>> >>>>>>>>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> If I run a query like this >>>>>>>>> >>>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 >>>>>>>>> on table1.col1 = table2.col1") >>>>>>>>> >>>>>>>>> 1) Where will flink store the intermediate result? Imagine >>>>>>>>> flink-conf.yaml says state.backend = 'rocksdb' >>>>>>>>> >>>>>>>>> 2) If the intermediate results are stored in rockdb then what is >>>>>>>>> the key and value in this case(given the query above)? >>>>>>>>> >>>>>>>>> 3) What is the best way to query these intermediate results from >>>>>>>>> an external application? while the job is running and while the job >>>>>>>>> is not >>>>>>>>> running? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> >>>>> -- >>>>> Benoît Paris >>>>> Ingénieur Machine Learning Explicable >>>>> Tél : +33 6 60 74 23 00 >>>>> http://benoit.paris >>>>> http://explicable.ml >>>>> >>>> >>> >>> -- >>> Benoît Paris >>> Ingénieur Machine Learning Explicable >>> Tél : +33 6 60 74 23 00 >>> http://benoit.paris >>> http://explicable.ml >>> >>