Ah, yep. I do create a keyed stream which does not partition data. And I pre-aggregate key-values inside my operator. But I cannot rely on the number of keys to pre-aggregate because I can never receive some specific number of keys. So, the master concept to pre-aggregate is the time. After that, I can aggregate earlier if I reach a number of keys.
*--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Tue, Nov 5, 2019 at 10:29 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi! > Sorry I should have given more context around what I was suggesting :) > What I was suggesting is maybe you could make your non-keyed stream keyed > by assigning random/deterministic keys with some logic. > > Gyula > > > On Tue, Nov 5, 2019 at 10:13 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> @Gyula, I am afraid I haven't got your point. >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Tue, Nov 5, 2019 at 10:11 AM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> You might have to introduce some dummy keys for a more robust solution >>> that integrates with the fault-tolerance mechanism. >>> >>> Gyula >>> >>> On Tue, Nov 5, 2019 at 9:57 AM Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> >>>> Thanks Piotr, >>>> >>>> the thing is that I am on Stream data and not on keyed stream data. So, >>>> I cannot use the TimerService concept here. I am triggering a local window. >>>> I ended up using java.util.Timer [1] and it seems to suffice my >>>> requirements. >>>> >>>> [1] https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html >>>> >>>> Thanks! >>>> >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>> >>>> On Wed, Oct 30, 2019 at 2:59 PM Piotr Nowojski <pi...@ververica.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> If you want to register a processing/event time trigger in your custom >>>>> operator, you can take a look how other operators are doing it, by calling >>>>> AbstractStreamOperator#getInternalTimerService [1]. You can look >>>>> around the Flink’s code base for usages of this method, there are at least >>>>> couple of them (like CepOperator or IntervalJoinOperator). >>>>> >>>>> Hope that helps, >>>>> Piotrek >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable- >>>>> >>>>> On 28 Oct 2019, at 10:09, Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I have my own stream operator which trigger an aggregation based on >>>>> the number of items received >>>>> (OneInputStreamOperator#processElement(StreamRecord)). However, it is >>>>> possible to not trigger my aggregation if my operator does not receive the >>>>> max items that have been set. So, I need a timeout trigger. >>>>> >>>>> I am confused if I need to extend Trigger on >>>>> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a >>>>> parameter of the operator class >>>>> MyPreAggregate-AbstractUdfStreamOperator<K, >>>>> V, IN, OUT, W extends Window>. what is the best approach? >>>>> >>>>> Thanks, >>>>> Felipe >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com/>* >>>>> >>>>> >>>>>