Hi Niels, I’m afraid this will not work. (If I understood correctly what you are trying to do.) When the trigger is being serialized/deserialized each parallel instance of the trigger has their own copy of the QueueSource object. Plus, a separate instance of the QueueSource itself will be running in each parallel instance of the source operator. And there is no way for there being communication between the trigger and source, since they might now even run on the same machine in the end.
Cheers, Aljoscha > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote: > > Hi, > > Just to let you know: I tried passing a SourceFunction but I haven't been > able to get that to work (yet). > > I passed an instance of this (see code below) into my Trigger and stored it > there as: > private QueueSource output; > and then I called from the onElement something like: > output.put("Foo",1234); > > When I run this from my IDE I get two distinct instances of the queue > (effect: the stuff I put in doesn't come out at the other end). > > Any pointers how (and if) this can be fixed are welcome. > Only if this works will I look into making this a generic (I got some type > related exceptions when I tried that). > > Niels > > > (Below has Apache 2.0 License; so copy adapt and improve if you want to) > package nl.basjes.flink.experiments; > > import org.apache.flink.configuration.Configuration; > import > org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction; > > import java.util.concurrent.ConcurrentLinkedQueue; > > public class QueueSource extends RichEventTimeSourceFunction<String> { > private static final long serialVersionUID = 1L; > > private volatile boolean isRunning = true; > > private ConcurrentLinkedQueue<QueueElement> queue = new > ConcurrentLinkedQueue<>(); > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > } > > @Override > public void close() throws Exception { > super.close(); > } > > @Override > public void run(SourceContext<String> ctx) throws Exception { > this.isRunning = true; > > while (this.isRunning) { > if (queue.isEmpty()) { > Thread.sleep(1); // Sleep 1 ms before retrying to dequeue > again > continue; > } > QueueElement queueElement = queue.poll(); > ctx.collectWithTimestamp(queueElement.element, > queueElement.timestamp); > } > } > > public void cancel() { > this.isRunning = false; > } > > public void put(String element, long timestamp) { > QueueElement queueElement = new QueueElement(); > queueElement.element = element; > queueElement.timestamp = timestamp; > queue.add(queueElement); > } > } > > class QueueElement { > String element; > long timestamp; > } > > > > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> wrote: > Thanks. > > The way I solved it now is by creating a class that persists data into > something external (right now HBase and/or Kafka) and use that from the > trigger to output the data. > > I have two followup questions: > 1) Is it possible to pass an instance of 'SourceFunction' as such a > parameter (without breaking Flink)? > 2) I want to save resources so I'm using a single instance of my 'Extra data > output class' in the instance of the Trigger. Thus reusing the connections to > the outside over multiple Window instances. Can I assume that a single > instance of Trigger will only be used by a single thread? I.e. Can I assume > that I do not need locking and synchronization? > > Niels > > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Niels! > > I think there is no clean way to emit data from a trigger right now, you can > only emit data from the window functions. > > You can emit two different kind of data types using an "Either" type. This is > built-in in Scala, in Java we added it on 1.0-SNAPSHOT: > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java > > Maybe being able to emit different type of elements helps your use case... > > > These types of questions have been coming up quite a bit, people looking to > do different actions inside the windows on different triggers (on element, on > event time). > > As per discussion with Aljoscha, one way to make this more flexible is to > enhance what you can do with custom state: > - State has timeouts (for cleanup) > - Functions allow you to schedule event-time progress notifications > > Stephan > > > > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > I'm working on something that uses the Flink Window feature. > I have written a custom Trigger to build the Window I need. > > I am using the Window feature because I need state and I need to expire (and > clean) this state after a timeout (I use the onEventTime to do that). > Because I need the data streaming in real time (augmented with the mentioned > state) I 'FIRE' after every event. Just before I 'PURGE' the window I need > the fact of this purge (and some of the stats of this Window) as a separate > event in a separate 'DataStream'. > > Now the interfaces of the various classes only support output as a single > java type (very sane choice). > So what I do right now is put my events on something 'external' (HBase/Kafka) > and read it in via a different Source implementation. > > My question: Is there a better way to do this? > Can I (for example) create a special 'Source' that I can pass as a parameter > to my Trigger and then onEventTime just output a 'new event' ? > > What do you recommend? > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes