I'll have an other look at the example code you sent me. Thanks. On Fri, Dec 11, 2015 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Niels! > > Quick question (I am probably overlooking something here) - if you simply > want to emit each element (Trigger onElement) in addition to a special data > stream, can you not simply have two operators that consume the original > data stream: The window operator and the additional source. > > If you need each element to pass through the window function anyways, I > think it may almost be easier to use the custom state with timeout example > I sent you a while back. There you have fill flexibility and need not > separate between trigger state, window state, etc... > > Stephan > > > > On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Niels, >> I’m afraid this will not work. (If I understood correctly what you are >> trying to do.) When the trigger is being serialized/deserialized each >> parallel instance of the trigger has their own copy of the QueueSource >> object. Plus, a separate instance of the QueueSource itself will be running >> in each parallel instance of the source operator. And there is no way for >> there being communication between the trigger and source, since they might >> now even run on the same machine in the end. >> >> Cheers, >> Aljoscha >> > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote: >> > >> > Hi, >> > >> > Just to let you know: I tried passing a SourceFunction but I haven't >> been able to get that to work (yet). >> > >> > I passed an instance of this (see code below) into my Trigger and >> stored it there as: >> > private QueueSource output; >> > and then I called from the onElement something like: >> > output.put("Foo",1234); >> > >> > When I run this from my IDE I get two distinct instances of the queue >> (effect: the stuff I put in doesn't come out at the other end). >> > >> > Any pointers how (and if) this can be fixed are welcome. >> > Only if this works will I look into making this a generic (I got some >> type related exceptions when I tried that). >> > >> > Niels >> > >> > >> > (Below has Apache 2.0 License; so copy adapt and improve if you want to) >> > package nl.basjes.flink.experiments; >> > >> > import org.apache.flink.configuration.Configuration; >> > import >> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction; >> > >> > import java.util.concurrent.ConcurrentLinkedQueue; >> > >> > public class QueueSource extends RichEventTimeSourceFunction<String> { >> > private static final long serialVersionUID = 1L; >> > >> > private volatile boolean isRunning = true; >> > >> > private ConcurrentLinkedQueue<QueueElement> queue = new >> ConcurrentLinkedQueue<>(); >> > >> > @Override >> > public void open(Configuration parameters) throws Exception { >> > super.open(parameters); >> > } >> > >> > @Override >> > public void close() throws Exception { >> > super.close(); >> > } >> > >> > @Override >> > public void run(SourceContext<String> ctx) throws Exception { >> > this.isRunning = true; >> > >> > while (this.isRunning) { >> > if (queue.isEmpty()) { >> > Thread.sleep(1); // Sleep 1 ms before retrying to >> dequeue again >> > continue; >> > } >> > QueueElement queueElement = queue.poll(); >> > ctx.collectWithTimestamp(queueElement.element, >> queueElement.timestamp); >> > } >> > } >> > >> > public void cancel() { >> > this.isRunning = false; >> > } >> > >> > public void put(String element, long timestamp) { >> > QueueElement queueElement = new QueueElement(); >> > queueElement.element = element; >> > queueElement.timestamp = timestamp; >> > queue.add(queueElement); >> > } >> > } >> > >> > class QueueElement { >> > String element; >> > long timestamp; >> > } >> > >> > >> > >> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> wrote: >> > Thanks. >> > >> > The way I solved it now is by creating a class that persists data into >> something external (right now HBase and/or Kafka) and use that from the >> trigger to output the data. >> > >> > I have two followup questions: >> > 1) Is it possible to pass an instance of 'SourceFunction' as such a >> parameter (without breaking Flink)? >> > 2) I want to save resources so I'm using a single instance of my 'Extra >> data output class' in the instance of the Trigger. Thus reusing the >> connections to the outside over multiple Window instances. Can I assume >> that a single instance of Trigger will only be used by a single thread? >> I.e. Can I assume that I do not need locking and synchronization? >> > >> > Niels >> > >> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote: >> > Hi Niels! >> > >> > I think there is no clean way to emit data from a trigger right now, >> you can only emit data from the window functions. >> > >> > You can emit two different kind of data types using an "Either" type. >> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT: >> > >> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java >> > >> > Maybe being able to emit different type of elements helps your use >> case... >> > >> > >> > These types of questions have been coming up quite a bit, people >> looking to do different actions inside the windows on different triggers >> (on element, on event time). >> > >> > As per discussion with Aljoscha, one way to make this more flexible is >> to enhance what you can do with custom state: >> > - State has timeouts (for cleanup) >> > - Functions allow you to schedule event-time progress notifications >> > >> > Stephan >> > >> > >> > >> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> wrote: >> > Hi, >> > >> > I'm working on something that uses the Flink Window feature. >> > I have written a custom Trigger to build the Window I need. >> > >> > I am using the Window feature because I need state and I need to expire >> (and clean) this state after a timeout (I use the onEventTime to do that). >> > Because I need the data streaming in real time (augmented with the >> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the >> window I need the fact of this purge (and some of the stats of this Window) >> as a separate event in a separate 'DataStream'. >> > >> > Now the interfaces of the various classes only support output as a >> single java type (very sane choice). >> > So what I do right now is put my events on something 'external' >> (HBase/Kafka) and read it in via a different Source implementation. >> > >> > My question: Is there a better way to do this? >> > Can I (for example) create a special 'Source' that I can pass as a >> parameter to my Trigger and then onEventTime just output a 'new event' ? >> > >> > What do you recommend? >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes >> > >> > >> > >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes >> > >> > >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes >> >> > -- Best regards / Met vriendelijke groeten, Niels Basjes