Hi, I wrote a little example that could be what you are looking for: https://github.com/dataArtisans/query-window-example
It basically implements a window operator with a modifiable window size that also allows querying the current accumulated window contents using a second input stream. There is a README file in the github repository, but please let me know if you need further explanations. Cheers, Aljoscha > On 18 Nov 2015, at 12:02, Robert Metzger <rmetz...@apache.org> wrote: > > Hi Roman, > > I've updated the documentation. It seems that it got out of sync. Thank you > for notifying us about this. > > My colleague Aljoscha has some experimental code that is probably doing what > you are looking for: A standing window (your RT-buffer) that you can query > using a secondary stream (your user's queries). > He'll post the code soon to this email thread. > > Regards, > Robert > > > On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssde...@gmail.com> wrote: > Hello, > > thanks, Stephan, but triggers are not that I searched. And BTW, the > documentation is obsolete. There is no Count class now. I found CountTrigger > only. > > Thanks Robert, your example may be useful for me but in some other point. I > mentioned "union" as an ordinary union of similar data. It is the same as > "union" in the datastream documentation. > > The task is very simple. We have an infinite stream of data from sensors, > billing system etc. There is no matter what it is but it is infinite. We have > to store the data in any persistent storage to be able to make analytical > queries later. And there is a stream of user's analytical queries. But the > stream of input data is big and time of saving in the persistent storage is > big too. And we have not a very fast bigdata OLTP storage. That is the data > extracted from the persistent storage by the user's requests probably will > not contain actual data. We have to have some real time buffer (RT-Buffer in > the schema) with actual data and have to union it with the data processing > results from persistent storage (I don't speak about data deduplication and > ordering now.). And of course the user's query are unpredictable regarding > data filtering conditions. > > The attached schema is attempt to understand how it may be implemented with > Flink. I tried to imagine how to implement it by Flink's streaming API but > found obstacles. This schema is not first variant. It contains separated > driver program to configure new jobs by user's queries. The reason I not > found a way how to link the stream of user's queries with further data > processing. But it is some near to > https://gist.github.com/fhueske/4ea5422edb5820915fa4 > > > <flink_streams.png> > > The main question is how to process each user's query combining it with > actual data from the real time buffer and batch request to the persistent > storage. Unfortunately I not found a decision in Streaming API only. > > Regards, > Roman > > 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetz...@apache.org>: > I think what you call "union" is a "connected stream" in Flink. Have a look > at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4 > It shows how to dynamically update a list of filters by external requests. > Maybe that's what you are looking for? > > > > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > I don not really understand what exactly you want to do, especially the > "union an infinite real time data stream with filtered persistent data where > the condition of filtering is provided by external requests". > > If you want to work on substreams in general, there are two options: > > 1) Create the substream in a streaming window. You can "cut" the stream based > on special records/events that signal that the subsequence is done. Have a > look at the "Trigger" class for windows, it can react to elements and their > contents: > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams > (secion on Advanced Windowing). > > > 2) You can trigger sequences of batch jobs. The batch job data source (input > format) can decide when to stop consuming the stream, at which point the > remainder of the transformations run, and the batch job finishes. > You can already run new transformation chains after each call to > "env.execute()", once the execution finished, to implement the sequence of > batch jobs. > > > I would try and go for the windowing solution if that works, because that > will give you better fault tolerance / high availability. In the repeated > batch jobs case, you need to worry yourself about what happens when the > driver program (that calls env.execute()) fails. > > > Hope that helps... > > Greetings, > Stephan > > > > On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssde...@gmail.com> wrote: > Hello, > > thanks for the answer but windows produce periodical results. I used your > example but the data source is changed to TCP stream: > > DataStream<String> text = env.socketTextStream("localhost", 2015, > '\n'); > DataStream<Tuple2<String, Integer>> wordCounts = > text > .flatMap(new LineSplitter()) > .keyBy(0) > .timeWindow(Time.of(5, TimeUnit.SECONDS)) > .sum(1); > > wordCounts.print(); > env.execute("WordCount Example"); > > I see an infinite results printing instead of the only list. > > The data source is following script: > ----------------------------------------------------- > #!/usr/bin/env ruby > > require 'socket' > > server = TCPServer.new 2015 > loop do > Thread.start(server.accept) do |client| > puts Time.now.to_s + ': New client!' > loop do > client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}" > sleep rand(1000)/1000.0 > end > client.close > end > end > ----------------------------------------------------- > > My purpose is to union an infinite real time data stream with filtered > persistent data where the condition of filtering is provided by external > requests. And the only result of union is interested. In this case I guess I > need a way to terminate the stream. May be I wrong. > > Moreover it should be possible to link the streams by next request with > other filtering criteria. That is create new data transformation chain after > running of env.execute("WordCount Example"). Is it possible now? If not, is > it possible with minimal changes of the core of Flink? > > Regards, > Roman > > 2015-11-09 12:34 GMT+04:00 Stephan Ewen <se...@apache.org>: > Hi! > > If you want to work on subsets of streams, the answer is usually to use > windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))". > > The transformations that you want to make, do they fit into a window function? > > There are thoughts to introduce something like global time windows across the > entire stream, inside which you can work more in a batch-style, but that is > quite an extensive change to the core. > > Greetings, > Stephan > > > On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssde...@gmail.com> wrote: > Hello, > > > I need to extract a finite subset like a data buffer from an infinite data > stream. The best way for me is to obtain a finite stream with data > accumulated for a 1minute before (as example). But I not found any existing > technique to do it. > > > As a possible ways how to do something near to a stream’s subset I see > following cases: > > - some transformation operation like ‘take_while’ that produces new > stream but able to switch one to FINISHED state. Unfortunately I not found > how to switch the state of a stream from a user code of transformation > functions; > > - new DataStream or StreamSource constructors which allow to connect > a data processing chain to the source stream. It may be something like > mentioned take_while transform function or modified StreamSource.run method > with data from the source stream. > > > That is I have two questions. > > 1) Is there any technique to extract accumulated data from a stream as a > stream (to union it with another stream)? This is like pure buffer mode. > > 2) If the answer to first question is negative, is there something like > take_while transformation or should I think about custom implementation of > it? Is it possible to implement it without modification of the core of Flink? > > > Regards, > > Roman > > > > > > >