Hi, Generally speaking it’s a good question, why do you need to do this? What information do you need from the outer join’s internal state? Can not you just process the result to obtain the same information in another way?
> Yes, I am looking for this but I am not sure how to do this? Should I use the > processFunction(like the event-driven applications) ? Another basic question, are you using DataStream API, TableAPI or SQL? Assuming TableAPI or SQL, you would have to split your query into three: 1. Left side of the join 2. Right side of the join 3. Downstream of the join (if any) Next you would have to write your own DataStream API outer join operator (implement your own or copy/paste or inherit from the SQL’s/Table API operator), which has an additional side output [0] of the state changes that you want. To do this, you probably can go with two different approaches: a) define CoProcessFunction b) define TwoInputStreamOperator After that, you have to convert the queries from 1. And 2. Into two separate DataStream’s [1], connect them [2] and process [3] with yours CoProcessFunction (a) or transform [4] with yours TwoInputStreamOperator, and convert the result back from a DataStream to a Table [5] [0] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> [1] for example StreamTableEnvironment#toRetractStream() or #toAppendStream() [2] https://training.ververica.com/lessons/connected-streams.html <https://training.ververica.com/lessons/connected-streams.html> [3] ConnectedStreams#process() [4] ConnectedStreams#transform() [5] StreamTableEnvironment#fromDataStream(org.apache.flink.streaming.api.datastream.DataStream<T>) However keep in mind that in [5], there is probably no way to convert a DataStream with retraction/updates back into a Table, so your join operator would have to produce append only output. Piotrek > On 30 Oct 2019, at 20:45, kant kodali <kanth...@gmail.com> wrote: > > "I think you would have to implement your own custom operator that would > output changes to it’s internal state as a side output" > > Yes, I am looking for this but I am not sure how to do this? Should I use the > processFunction(like the event-driven applications) ? > > On Wed, Oct 30, 2019 at 8:53 AM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi Kant, > > Checkpointing interval is configurable, but I wouldn’t count on it working > well with even 10s intervals. > > I think what you are this is not supported by Flink generically. Maybe > Queryable state I mentioned before? But I have never used it. > > I think you would have to implement your own custom operator that would > output changes to it’s internal state as a side output. > > Piotrek > >> On 30 Oct 2019, at 16:14, kant kodali <kanth...@gmail.com >> <mailto:kanth...@gmail.com>> wrote: >> >> Hi Piotr, >> >> I am talking about the internal state. How often this state gets >> checkpointed? if it is every few seconds then it may not meet our real-time >> requirement(sub second). >> >> The question really is can I read this internal state in a streaming fashion >> in an update mode? The state processor API seems to expose DataSet but not >> DataStream so I am not sure how to read internal state in streaming fashion >> in an update made? >> >> Thanks! >> >> On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> wrote: >> Hi, >> >> I’m not sure what are you trying to achieve. What do you mean by “state of >> full outer join”? The result of it? Or it’s internal state? Also keep in >> mind, that internal state of the operators in Flink is already >> snapshoted/written down to an external storage during checkpointing >> mechanism. >> >> The result should be simple, just write it to some Sink. >> >> For the internal state, it sounds like you are doing something not the way >> it was intended… having said that, you can try one of the following options: >> a) Implement your own outer join operator (might not be as easy if you are >> using Table API/SQL) and just create a side output for the state changes. >> b) Use state processor API to read the content of a savepoint/checkpoint >> [1][2] >> c) Use queryable state [3] (I’m not sure about this, I have never used >> queryable state) >> >> Piotrek >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html> >> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html >> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html> >> [3] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html> >> >>> On 29 Oct 2019, at 16:42, kant kodali <kanth...@gmail.com >>> <mailto:kanth...@gmail.com>> wrote: >>> >>> Hi All, >>> >>> I want to do a full outer join on two streaming data sources and store the >>> state of full outer join in some external storage like rocksdb or something >>> else. And then want to use this intermediate state as a streaming source >>> again, do some transformation and write it to some external store. is that >>> possible with Flink 1.9? >>> >>> Also what storage systems support push mechanism for the intermediate data? >>> For example, In the use case above does rocksdb support push/emit events in >>> a streaming fashion? >>> >>> Thanks! >> >