Yes, the default is writing to an external system. Especially if you want
SQL, then there is currently no other way around it.

The drawbacks of writing to external systems are: additional maintenance of
another system and higher latency.

On Tue, Jan 28, 2020 at 11:49 AM kant kodali <kanth...@gmail.com> wrote:

> Hi Arvid,
>
> I am trying to understand your statement. I am new to Flink so excuse me
> if I don't know something I should have known. ProcessFunction just process
> the records right? If so, how is it better than writing to an external
> system? At the end of the day I want to be able to query it (doesn't have
> to be through Queryable state and actually I probably don't want to use
> Queryable state for its limitations). But ideally I want to be able to
> query the intermediate states using SQL and hopefully, the store that is
> maintaining the intermediate state has some sort of index support so the
> read queries are faster than doing the full scan.
>
> Also, I hear Querying intermediate state just like one would in a database
> is a widely requested feature so its a bit surprising that this is not
> solved just yet but I am hopeful!
>
> Thanks!
>
>
>
> On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Kant,
>>
>> just wanted to mention the obvious. If you add a ProcessFunction right
>> after the join, you could maintain a user state with the same result. That
>> will of course blow up the data volume by a factor of 2, but may still be
>> better than writing to an external system.
>>
>> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>>> changed.
>>> Thanks for the details, Jark!
>>>
>>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi Kant,
>>>> Having a custom state backend is very difficult and is not recommended.
>>>>
>>>> Hi Benoît,
>>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>>> We also have an early issue FLINK-6968 to tracks this.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>
>>>>> Hi all!
>>>>>
>>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>>>> on the intermediate state is on the roadmap"?
>>>>> Are you referring to working on
>>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>>>
>>>>> Cheers
>>>>> Ben
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is it a common practice to have a custom state backend? if so, what
>>>>>> would be a popular custom backend?
>>>>>>
>>>>>> Can I do Elasticseatch as a state backend?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kant,
>>>>>>>
>>>>>>> 1) List of row is also sufficient in this case. Using a MapState is
>>>>>>> in order to retract a row faster, and save the storage size.
>>>>>>>
>>>>>>> 2) State Process API is usually used to process save point. I’m
>>>>>>> afraid the performance is not good to use it for querying.
>>>>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>>>     So I’m not sure whether it works or not.
>>>>>>>
>>>>>>> 3)You can have a custom statebackend by
>>>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and 
>>>>>>> use it
>>>>>>> via `env.setStateBackend(…)`.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jark,
>>>>>>>>
>>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have
>>>>>>>> the same joining key right?
>>>>>>>>
>>>>>>>> 2) Can I use state processor API
>>>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>>>> from an external application to query the intermediate results in near
>>>>>>>> real-time? I thought querying rocksdb state is a widely requested 
>>>>>>>> feature.
>>>>>>>> It would be really great to consider this feature for 1.11
>>>>>>>>
>>>>>>>> 3) Is there any interface where I can implement my own state
>>>>>>>> backend?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Kant,
>>>>>>>>>
>>>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>>>> 2) In old planner, the left state is the same with right state
>>>>>>>>> which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>>>>     It is a 2-level map structure, where the `col1` is the join
>>>>>>>>> key, it is the first-level key of the state. The key of the MapState 
>>>>>>>>> is the
>>>>>>>>> input row,
>>>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>>>> indicates when to cleanup this row (avoid infinite state size). You 
>>>>>>>>> can
>>>>>>>>> find the source code here[1].
>>>>>>>>>     In blink planner, the state structure will be more complex
>>>>>>>>> which is determined by the meta-information of upstream. You can see 
>>>>>>>>> the
>>>>>>>>> source code of blink planner here [2].
>>>>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>>>>> Usually, users should write the query result to an external system 
>>>>>>>>> (like
>>>>>>>>> Mysql) and query the external system.
>>>>>>>>>     Query on the intermediate state is on the roadmap, but I guess
>>>>>>>>> it is not in 1.11 plan.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>>>> [2]:
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> If I run a query like this
>>>>>>>>>
>>>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2
>>>>>>>>> on table1.col1 = table2.col1")
>>>>>>>>>
>>>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>>>
>>>>>>>>> 2) If the intermediate results are stored in rockdb then what is
>>>>>>>>> the key and value in this case(given the query above)?
>>>>>>>>>
>>>>>>>>> 3) What is the best way to query these intermediate results from
>>>>>>>>> an external application? while the job is running and while the job 
>>>>>>>>> is not
>>>>>>>>> running?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>> --
>>>>> Benoît Paris
>>>>> Ingénieur Machine Learning Explicable
>>>>> Tél : +33 6 60 74 23 00
>>>>> http://benoit.paris
>>>>> http://explicable.ml
>>>>>
>>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>

Reply via email to