Hi, I'm afraid I don't understand your use case yet. In you example you want to preserve only the elements where the string value contains a "3"? This can be done using a filter, as in
source.filter( value -> value.f1.contains("3") ) This is probably too easy, though, and I'm misunderstanding the problem. Cheers, Aljoscha On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <kkula...@gmail.com> wrote: > Thanks for reply. > > Maybe I would need some advise in this case. My situation: we have a > stream of data, generally speaking <Long;String> tuples where long is a > unique key (ie there are no tuples with the same key) > > I need to filter out all tuples that do not match certain lucene query. > > Creating lucene index on one entry is too expensive and I cannot guess > what load in terms of number of entries per second would be. Idea was to > group entries by count, create index, filter and stream remaining tuples > for further processing. > > As a sample application if we replace lucene indexing with something like > String's 'contains' method source would look like this: > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource<Tuple2<Long, String>> source = env.addSource(new > SourceFunction<Tuple2<Long, String>>() { > @Override > public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { > LongStream.range(0, 30).forEach(l -> { > ctx.collect(Tuple2.of(l, "This is " + l)); > > > }); > } > > @Override > public void cancel() { > > } > }); > > And I need lets say to window tuples and preserve only those which > value.contains("3"). > There are no grouping by key since basically all keys are different. I > might not know everything about flink yet but for this particular example - > does what you were saying make sense? > > > Thanks! > Kostya > > > > > > > On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> if you are doing the windows not for their actual semantics I would >> suggest not using count based windows and also not using the *All windows. >> The *All windows are all non-parallel, i.e. you always only get one >> parallel instance of your window operator even if you have a huge cluster. >> >> Also, in most cases it is better to not use a plain WindowFunction with >> apply because all elements have to be buffered so that they can be passed >> as an Iterable, Iterable<Long> in your example. If you can, I would suggest >> to use a ReduceFunction or FoldFunction or an apply() with an incremental >> aggregation function: apply(ReduceFunction, WindowFunction) or >> apply(FoldFunction, WindowFunction). These allow incremental aggregation of >> the result as elements arrive and don't require buffering of all elements >> until the window fires. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kkula...@gmail.com> wrote: >> >>> Maybe if it is not the first time it worth considering adding this thing >>> as an option? ;-) >>> >>> My usecase - I have a pretty big amount of data basically for ETL. It is >>> finite but it is big. I see it more as a stream not as a dataset. Also I >>> would re-use the same code for infinite stream later... >>> And I do not much care about exact window size - it is just for >>> performance reasons I create a windows. >>> >>> Anyways - that you for the responses! >>> >>> >>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> People have wondered about that a few times, yes. My opinion is that a >>>> stream is potentially infinite and processing only stops for anomalous >>>> reasons: when the job crashes, when stopping a job to later redeploy it. In >>>> those cases you would not want to flush out your data but keep them and >>>> restart from the same state when the job is restarted. >>>> >>>> You can implement the behavior by writing a custom Trigger that behaves >>>> like the count trigger but also fires when receiving a Long.MAX_VALUE >>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has >>>> stopped processing for natural reasons. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote: >>>> >>>>> Thanks, >>>>> >>>>> I wonder wouldn't it be good to have a built-in such functionality. At >>>>> least when incoming stream is finished - flush remaining elements. >>>>> >>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org >>>>> > wrote: >>>>> >>>>>> Hi, >>>>>> yes, you can achieve this by writing a custom Trigger that can >>>>>> trigger both on the count or after a long-enough timeout. It would be a >>>>>> combination of CountTrigger and EventTimeTrigger (or >>>>>> ProcessingTimeTrigger) >>>>>> so you could look to those to get started. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> >>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I have a pretty big but final stream and I need to be able to window >>>>>>> it by number of elements. >>>>>>> In this case from my observations flink can 'skip' the latest chunk >>>>>>> of data if it has lower amount of elements than window size: >>>>>>> >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> DataStreamSource<Long> source = env.addSource(new >>>>>>> SourceFunction<Long>() { >>>>>>> >>>>>>> @Override >>>>>>> public void run(SourceContext<Long> ctx) throws Exception { >>>>>>> LongStream.range(0, 35).forEach(ctx::collect); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void cancel() { >>>>>>> >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>>>>>> GlobalWindow>() { >>>>>>> @Override >>>>>>> public void apply(GlobalWindow window, Iterable<Long> values, >>>>>>> Collector<Long> out) throws Exception { >>>>>>> System.out.println(Joiner.on(',').join(values)); >>>>>>> } >>>>>>> }).print(); >>>>>>> >>>>>>> env.execute("yoyoyo"); >>>>>>> >>>>>>> >>>>>>> Output: >>>>>>> 0,1,2,3,4,5,6,7,8,9 >>>>>>> 10,11,12,13,14,15,16,17,18,19 >>>>>>> 20,21,22,23,24,25,26,27,28,29 >>>>>>> >>>>>>> I.e. elements from 10 to 35 are not being processed. >>>>>>> >>>>>>> Does it make sense to have: count OR timeout window which will evict >>>>>>> new window when number of elements reach a threshold OR collecting >>>>>>> timeout >>>>>>> occurs? >>>>>>> >>>>>> >>>>> >>> >