I wrote a little example that could be what you are looking for: 

It basically implements a window operator with a modifiable window size that 
also allows querying the current accumulated window contents using a second 
input stream.

There is a README file in the github repository, but please let me know if you 
need further explanations.


> On 18 Nov 2015, at 12:02, Robert Metzger <rmetz...@apache.org> wrote:
> Hi Roman,
> I've updated the documentation. It seems that it got out of sync. Thank you 
> for notifying us about this.
> My colleague Aljoscha has some experimental code that is probably doing what 
> you are looking for: A standing window (your RT-buffer) that you can query 
> using a secondary stream (your user's queries).
> He'll post the code soon to this email thread.
> Regards,
> Robert
> On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssde...@gmail.com> wrote:
> Hello,
>   thanks, Stephan, but triggers are not that I searched. And BTW, the 
> documentation is obsolete. There is no Count class now. I found CountTrigger 
> only.
>   Thanks Robert, your example may be useful for me but in some other point. I 
> mentioned "union" as an ordinary union of similar data. It is the same as 
> "union" in the datastream documentation.
>   The task is very simple. We have an infinite stream of data from sensors, 
> billing system etc. There is no matter what it is but it is infinite. We have 
> to store the data in any persistent storage to be able to make analytical 
> queries later. And there is a stream of user's analytical queries. But the 
> stream of input data is big and time of saving in the persistent storage is 
> big too. And we have not a very fast bigdata OLTP storage. That is the data 
> extracted from the persistent storage by the user's requests probably will 
> not contain actual data. We have to have some real time buffer (RT-Buffer in 
> the schema) with actual data and have to union it with the data processing 
> results from persistent storage (I don't speak about data deduplication and 
> ordering now.). And of course the user's query are unpredictable regarding 
> data filtering conditions.
>   The attached schema is attempt to understand how it may be implemented with 
> Flink. I tried to imagine how to implement it by Flink's streaming API but 
> found obstacles. This schema is not first variant. It contains separated 
> driver program to configure new jobs by user's queries. The reason I not 
> found a way how to link the stream of user's queries with further data 
> processing. But it is some near to 
> https://gist.github.com/fhueske/4ea5422edb5820915fa4
> <flink_streams.png>
>   The main question is how to process each user's query combining it with 
> actual data from the real time buffer and batch request to the persistent 
> storage. Unfortunately I not found a decision in Streaming API only.
> Regards, 
> Roman
> 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetz...@apache.org>:
> I think what you call "union" is a "connected stream" in Flink. Have a look 
> at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4
> It shows how to dynamically update a list of filters by external requests.
> Maybe that's what you are looking for?
> On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> I don not really understand what exactly you want to do, especially the 
> "union an infinite real time data stream with filtered persistent data where 
> the condition of filtering is provided by external requests".
> If you want to work on substreams in general, there are two options:
> 1) Create the substream in a streaming window. You can "cut" the stream based 
> on special records/events that signal that the subsequence is done. Have a 
> look at the "Trigger" class for windows, it can react to elements and their 
> contents:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
>  (secion on Advanced Windowing).
> 2) You can trigger sequences of batch jobs. The batch job data source (input 
> format) can decide when to stop consuming the stream, at which point the 
> remainder of the transformations run, and the batch job finishes. 
> You can already run new transformation chains after each call to 
> "env.execute()", once the execution finished, to implement the sequence of 
> batch jobs.
> I would try and go for the windowing solution if that works, because that 
> will give you better fault tolerance / high availability. In the repeated 
> batch jobs case, you need to worry yourself about what happens when the 
> driver program (that calls env.execute()) fails.
> Hope that helps...
> Greetings,
> Stephan
> On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssde...@gmail.com> wrote:
> Hello, 
>   thanks for the answer but windows produce periodical results. I used your 
> example but the data source is changed to TCP stream:
>         DataStream<String> text = env.socketTextStream("localhost", 2015, 
> '\n');
>         DataStream<Tuple2<String, Integer>> wordCounts =
>                 text
>                 .flatMap(new LineSplitter())
>                 .keyBy(0)
>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>                 .sum(1);
>         wordCounts.print();
>         env.execute("WordCount Example");
>  I see an infinite results printing instead of the only list.
>  The data source is following script:
> -----------------------------------------------------
> #!/usr/bin/env ruby
> require 'socket'
> server = TCPServer.new 2015
> loop do
>   Thread.start(server.accept) do |client|
>     puts Time.now.to_s + ': New client!'
>     loop do
>       client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
>       sleep rand(1000)/1000.0
>     end
>     client.close
>   end
> end
> -----------------------------------------------------
>   My purpose is to union an infinite real time data stream with filtered 
> persistent data where the condition of filtering is provided by external 
> requests. And the only result of union is interested. In this case I guess I 
> need a way to terminate the stream. May be I wrong.
>   Moreover it should be possible to link the streams by next request with 
> other filtering criteria. That is create new data transformation chain after 
> running of env.execute("WordCount Example"). Is it possible now? If not, is 
> it possible with minimal changes of the core of Flink?
> Regards,
> Roman
> 2015-11-09 12:34 GMT+04:00 Stephan Ewen <se...@apache.org>:
> Hi!
> If you want to work on subsets of streams, the answer is usually to use 
> windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
> The transformations that you want to make, do they fit into a window function?
> There are thoughts to introduce something like global time windows across the 
> entire stream, inside which you can work more in a batch-style, but that is 
> quite an extensive change to the core.
> Greetings,
> Stephan
> On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssde...@gmail.com> wrote:
> Hello,
> I need to extract a finite subset like a data buffer from an infinite data 
> stream. The best way for me is to obtain a finite stream with data 
> accumulated for a 1minute before (as example). But I not found any existing 
> technique to do it.
> As a possible ways how to do something near to a stream’s subset I see 
> following cases:
> -          some transformation operation like ‘take_while’ that produces new 
> stream but able to switch one to FINISHED state. Unfortunately I not found 
> how to switch the state of a stream from a user code of transformation 
> functions;
> -          new DataStream or StreamSource constructors which allow to connect 
> a data processing chain to the source stream. It may be something like 
> mentioned take_while transform function or modified StreamSource.run method 
> with data from the source stream.
> That is I have two questions.
> 1)      Is there any technique to extract accumulated data from a stream as a 
> stream (to union it with another stream)? This is like pure buffer mode.
> 2)      If the answer to first question is negative, is there something like 
> take_while transformation or should I think about custom implementation of 
> it? Is it possible to implement it without modification of the core of Flink?
> Regards,
> Roman

Reply via email to