Hi,
I'm afraid I don't understand your use case yet. In you example you want to
preserve only the elements where the string value contains a "3"? This can
be done using a filter, as in

source.filter( value -> value.f1.contains("3") )

This is probably too easy, though, and I'm misunderstanding the problem.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <kkula...@gmail.com> wrote:

> Thanks for reply.
>
> Maybe I would need some advise in this case. My situation: we have a
> stream of data, generally speaking <Long;String> tuples where long is a
> unique key (ie there are no tuples with the same key)
>
> I need to filter out all tuples that do not match certain lucene query.
>
> Creating lucene index on one entry is too expensive and I cannot guess
> what load in terms of number of entries per second would be. Idea was to
> group entries by count, create index, filter and stream remaining tuples
> for further processing.
>
> As a sample application if we replace lucene indexing with something like
> String's 'contains' method source would look like this:
>
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStreamSource<Tuple2<Long, String>> source = env.addSource(new 
> SourceFunction<Tuple2<Long, String>>() {
>   @Override
>   public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
>     LongStream.range(0, 30).forEach(l -> {
>       ctx.collect(Tuple2.of(l, "This is " + l));
>
>
>     });
>   }
>
>   @Override
>   public void cancel() {
>
>   }
> });
>
> And I need lets say to window tuples and preserve only those which
> value.contains("3").
> There are no grouping by key since basically all keys are different. I
> might not know everything about flink yet but for this particular example -
> does what you were saying make sense?
>
>
> Thanks!
> Kostya
>
>
>
>
>
>
> On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> if you are doing the windows not for their actual semantics I would
>> suggest not using count based windows and also not using the *All windows.
>> The *All windows are all non-parallel, i.e. you always only get one
>> parallel instance of your window operator even if you have a huge cluster.
>>
>> Also, in most cases it is better to not use a plain WindowFunction with
>> apply because all elements have to be buffered so that they can be passed
>> as an Iterable, Iterable<Long> in your example. If you can, I would suggest
>> to use a ReduceFunction or FoldFunction or an apply() with an incremental
>> aggregation function: apply(ReduceFunction, WindowFunction) or
>> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
>> the result as elements arrive and don't require buffering of all elements
>> until the window fires.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kkula...@gmail.com> wrote:
>>
>>> Maybe if it is not the first time it worth considering adding this thing
>>> as an option? ;-)
>>>
>>> My usecase - I have a pretty big amount of data basically for ETL. It is
>>> finite but it is big. I see it more as a stream not as a dataset. Also I
>>> would re-use the same code for infinite stream later...
>>> And I do not much care about exact window size - it is just for
>>> performance reasons I create a windows.
>>>
>>> Anyways - that you for the responses!
>>>
>>>
>>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> People have wondered about that a few times, yes. My opinion is that a
>>>> stream is potentially infinite and processing only stops for anomalous
>>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>>> those cases you would not want to flush out your data but keep them and
>>>> restart from the same state when the job is restarted.
>>>>
>>>> You can implement the behavior by writing a custom Trigger that behaves
>>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>>> stopped processing for natural reasons.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote:
>>>>
>>>>> Thanks,
>>>>>
>>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>>> least when incoming stream is finished - flush remaining elements.
>>>>>
>>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> yes, you can achieve this by writing a custom Trigger that can
>>>>>> trigger both on the count or after a long-enough timeout. It would be a
>>>>>> combination of CountTrigger and EventTimeTrigger (or 
>>>>>> ProcessingTimeTrigger)
>>>>>> so you could look to those to get started.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>>> it by number of elements.
>>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>>> of data if it has lower amount of elements than window size:
>>>>>>>
>>>>>>>     StreamExecutionEnvironment env = 
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>     DataStreamSource<Long> source = env.addSource(new 
>>>>>>> SourceFunction<Long>() {
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>>       }
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void cancel() {
>>>>>>>
>>>>>>>       }
>>>>>>>     });
>>>>>>>
>>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>>>>>>> GlobalWindow>() {
>>>>>>>       @Override
>>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>>>>>>> Collector<Long> out) throws Exception {
>>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>>       }
>>>>>>>     }).print();
>>>>>>>
>>>>>>>     env.execute("yoyoyo");
>>>>>>>
>>>>>>>
>>>>>>> Output:
>>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>>
>>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>>
>>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>>> new window when number of elements reach a threshold OR collecting 
>>>>>>> timeout
>>>>>>> occurs?
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to