Hi Niels! Quick question (I am probably overlooking something here) - if you simply want to emit each element (Trigger onElement) in addition to a special data stream, can you not simply have two operators that consume the original data stream: The window operator and the additional source.
If you need each element to pass through the window function anyways, I think it may almost be easier to use the custom state with timeout example I sent you a while back. There you have fill flexibility and need not separate between trigger state, window state, etc... Stephan On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Niels, > I’m afraid this will not work. (If I understood correctly what you are > trying to do.) When the trigger is being serialized/deserialized each > parallel instance of the trigger has their own copy of the QueueSource > object. Plus, a separate instance of the QueueSource itself will be running > in each parallel instance of the source operator. And there is no way for > there being communication between the trigger and source, since they might > now even run on the same machine in the end. > > Cheers, > Aljoscha > > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote: > > > > Hi, > > > > Just to let you know: I tried passing a SourceFunction but I haven't > been able to get that to work (yet). > > > > I passed an instance of this (see code below) into my Trigger and stored > it there as: > > private QueueSource output; > > and then I called from the onElement something like: > > output.put("Foo",1234); > > > > When I run this from my IDE I get two distinct instances of the queue > (effect: the stuff I put in doesn't come out at the other end). > > > > Any pointers how (and if) this can be fixed are welcome. > > Only if this works will I look into making this a generic (I got some > type related exceptions when I tried that). > > > > Niels > > > > > > (Below has Apache 2.0 License; so copy adapt and improve if you want to) > > package nl.basjes.flink.experiments; > > > > import org.apache.flink.configuration.Configuration; > > import > org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction; > > > > import java.util.concurrent.ConcurrentLinkedQueue; > > > > public class QueueSource extends RichEventTimeSourceFunction<String> { > > private static final long serialVersionUID = 1L; > > > > private volatile boolean isRunning = true; > > > > private ConcurrentLinkedQueue<QueueElement> queue = new > ConcurrentLinkedQueue<>(); > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > } > > > > @Override > > public void close() throws Exception { > > super.close(); > > } > > > > @Override > > public void run(SourceContext<String> ctx) throws Exception { > > this.isRunning = true; > > > > while (this.isRunning) { > > if (queue.isEmpty()) { > > Thread.sleep(1); // Sleep 1 ms before retrying to > dequeue again > > continue; > > } > > QueueElement queueElement = queue.poll(); > > ctx.collectWithTimestamp(queueElement.element, > queueElement.timestamp); > > } > > } > > > > public void cancel() { > > this.isRunning = false; > > } > > > > public void put(String element, long timestamp) { > > QueueElement queueElement = new QueueElement(); > > queueElement.element = element; > > queueElement.timestamp = timestamp; > > queue.add(queueElement); > > } > > } > > > > class QueueElement { > > String element; > > long timestamp; > > } > > > > > > > > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> wrote: > > Thanks. > > > > The way I solved it now is by creating a class that persists data into > something external (right now HBase and/or Kafka) and use that from the > trigger to output the data. > > > > I have two followup questions: > > 1) Is it possible to pass an instance of 'SourceFunction' as such a > parameter (without breaking Flink)? > > 2) I want to save resources so I'm using a single instance of my 'Extra > data output class' in the instance of the Trigger. Thus reusing the > connections to the outside over multiple Window instances. Can I assume > that a single instance of Trigger will only be used by a single thread? > I.e. Can I assume that I do not need locking and synchronization? > > > > Niels > > > > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote: > > Hi Niels! > > > > I think there is no clean way to emit data from a trigger right now, you > can only emit data from the window functions. > > > > You can emit two different kind of data types using an "Either" type. > This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT: > > > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java > > > > Maybe being able to emit different type of elements helps your use > case... > > > > > > These types of questions have been coming up quite a bit, people looking > to do different actions inside the windows on different triggers (on > element, on event time). > > > > As per discussion with Aljoscha, one way to make this more flexible is > to enhance what you can do with custom state: > > - State has timeouts (for cleanup) > > - Functions allow you to schedule event-time progress notifications > > > > Stephan > > > > > > > > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> wrote: > > Hi, > > > > I'm working on something that uses the Flink Window feature. > > I have written a custom Trigger to build the Window I need. > > > > I am using the Window feature because I need state and I need to expire > (and clean) this state after a timeout (I use the onEventTime to do that). > > Because I need the data streaming in real time (augmented with the > mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the > window I need the fact of this purge (and some of the stats of this Window) > as a separate event in a separate 'DataStream'. > > > > Now the interfaces of the various classes only support output as a > single java type (very sane choice). > > So what I do right now is put my events on something 'external' > (HBase/Kafka) and read it in via a different Source implementation. > > > > My question: Is there a better way to do this? > > Can I (for example) create a special 'Source' that I can pass as a > parameter to my Trigger and then onEventTime just output a 'new event' ? > > > > What do you recommend? > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > >