Hello Aljoscha, very thanks. I tried to build your example but have an obstacle with org.apache.flink.runtime.state.AbstractStateBackend class. Where to get it? I guess it stored in your local branch only. Would you please to send me patches for public branch or share the branch with me?
Best regards, Roman 2015-11-18 17:24 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > I wrote a little example that could be what you are looking for: > https://github.com/dataArtisans/query-window-example > > It basically implements a window operator with a modifiable window size > that also allows querying the current accumulated window contents using a > second input stream. > > There is a README file in the github repository, but please let me know if > you need further explanations. > > Cheers, > Aljoscha > > > On 18 Nov 2015, at 12:02, Robert Metzger <rmetz...@apache.org> wrote: > > > > Hi Roman, > > > > I've updated the documentation. It seems that it got out of sync. Thank > you for notifying us about this. > > > > My colleague Aljoscha has some experimental code that is probably doing > what you are looking for: A standing window (your RT-buffer) that you can > query using a secondary stream (your user's queries). > > He'll post the code soon to this email thread. > > > > Regards, > > Robert > > > > > > On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssde...@gmail.com> wrote: > > Hello, > > > > thanks, Stephan, but triggers are not that I searched. And BTW, the > documentation is obsolete. There is no Count class now. I found > CountTrigger only. > > > > Thanks Robert, your example may be useful for me but in some other > point. I mentioned "union" as an ordinary union of similar data. It is the > same as "union" in the datastream documentation. > > > > The task is very simple. We have an infinite stream of data from > sensors, billing system etc. There is no matter what it is but it is > infinite. We have to store the data in any persistent storage to be able to > make analytical queries later. And there is a stream of user's analytical > queries. But the stream of input data is big and time of saving in the > persistent storage is big too. And we have not a very fast bigdata OLTP > storage. That is the data extracted from the persistent storage by the > user's requests probably will not contain actual data. We have to have some > real time buffer (RT-Buffer in the schema) with actual data and have to > union it with the data processing results from persistent storage (I don't > speak about data deduplication and ordering now.). And of course the user's > query are unpredictable regarding data filtering conditions. > > > > The attached schema is attempt to understand how it may be implemented > with Flink. I tried to imagine how to implement it by Flink's streaming API > but found obstacles. This schema is not first variant. It contains > separated driver program to configure new jobs by user's queries. The > reason I not found a way how to link the stream of user's queries with > further data processing. But it is some near to > https://gist.github.com/fhueske/4ea5422edb5820915fa4 > > > > > > <flink_streams.png> > > > > The main question is how to process each user's query combining it > with actual data from the real time buffer and batch request to the > persistent storage. Unfortunately I not found a decision in Streaming API > only. > > > > Regards, > > Roman > > > > 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetz...@apache.org>: > > I think what you call "union" is a "connected stream" in Flink. Have a > look at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4 > > It shows how to dynamically update a list of filters by external > requests. > > Maybe that's what you are looking for? > > > > > > > > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <se...@apache.org> wrote: > > Hi! > > > > I don not really understand what exactly you want to do, especially the > "union an infinite real time data stream with filtered persistent data > where the condition of filtering is provided by external requests". > > > > If you want to work on substreams in general, there are two options: > > > > 1) Create the substream in a streaming window. You can "cut" the stream > based on special records/events that signal that the subsequence is done. > Have a look at the "Trigger" class for windows, it can react to elements > and their contents: > > > > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams > (secion on Advanced Windowing). > > > > > > 2) You can trigger sequences of batch jobs. The batch job data source > (input format) can decide when to stop consuming the stream, at which point > the remainder of the transformations run, and the batch job finishes. > > You can already run new transformation chains after each call to > "env.execute()", once the execution finished, to implement the sequence of > batch jobs. > > > > > > I would try and go for the windowing solution if that works, because > that will give you better fault tolerance / high availability. In the > repeated batch jobs case, you need to worry yourself about what happens > when the driver program (that calls env.execute()) fails. > > > > > > Hope that helps... > > > > Greetings, > > Stephan > > > > > > > > On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssde...@gmail.com> wrote: > > Hello, > > > > thanks for the answer but windows produce periodical results. I used > your example but the data source is changed to TCP stream: > > > > DataStream<String> text = env.socketTextStream("localhost", > 2015, '\n'); > > DataStream<Tuple2<String, Integer>> wordCounts = > > text > > .flatMap(new LineSplitter()) > > .keyBy(0) > > .timeWindow(Time.of(5, TimeUnit.SECONDS)) > > .sum(1); > > > > wordCounts.print(); > > env.execute("WordCount Example"); > > > > I see an infinite results printing instead of the only list. > > > > The data source is following script: > > ----------------------------------------------------- > > #!/usr/bin/env ruby > > > > require 'socket' > > > > server = TCPServer.new 2015 > > loop do > > Thread.start(server.accept) do |client| > > puts Time.now.to_s + ': New client!' > > loop do > > client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}" > > sleep rand(1000)/1000.0 > > end > > client.close > > end > > end > > ----------------------------------------------------- > > > > My purpose is to union an infinite real time data stream with filtered > persistent data where the condition of filtering is provided by external > requests. And the only result of union is interested. In this case I guess > I need a way to terminate the stream. May be I wrong. > > > > Moreover it should be possible to link the streams by next request > with other filtering criteria. That is create new data transformation chain > after running of env.execute("WordCount Example"). Is it possible now? If > not, is it possible with minimal changes of the core of Flink? > > > > Regards, > > Roman > > > > 2015-11-09 12:34 GMT+04:00 Stephan Ewen <se...@apache.org>: > > Hi! > > > > If you want to work on subsets of streams, the answer is usually to use > windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))". > > > > The transformations that you want to make, do they fit into a window > function? > > > > There are thoughts to introduce something like global time windows > across the entire stream, inside which you can work more in a batch-style, > but that is quite an extensive change to the core. > > > > Greetings, > > Stephan > > > > > > On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssde...@gmail.com> wrote: > > Hello, > > > > > > I need to extract a finite subset like a data buffer from an infinite > data stream. The best way for me is to obtain a finite stream with data > accumulated for a 1minute before (as example). But I not found any existing > technique to do it. > > > > > > As a possible ways how to do something near to a stream’s subset I see > following cases: > > > > - some transformation operation like ‘take_while’ that produces > new stream but able to switch one to FINISHED state. Unfortunately I not > found how to switch the state of a stream from a user code of > transformation functions; > > > > - new DataStream or StreamSource constructors which allow to > connect a data processing chain to the source stream. It may be something > like mentioned take_while transform function or modified StreamSource.run > method with data from the source stream. > > > > > > That is I have two questions. > > > > 1) Is there any technique to extract accumulated data from a stream > as a stream (to union it with another stream)? This is like pure buffer > mode. > > > > 2) If the answer to first question is negative, is there something > like take_while transformation or should I think about custom > implementation of it? Is it possible to implement it without modification > of the core of Flink? > > > > > > Regards, > > > > Roman > > > > > > > > > > > > > > > >