You can strip the parts that accumulate the data for writing the files, then it becomes a very slim example to build on top of...
On Fri, Dec 11, 2015 at 3:20 PM, Niels Basjes <ni...@basjes.nl> wrote: > I'll have an other look at the example code you sent me. > Thanks. > > On Fri, Dec 11, 2015 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi Niels! >> >> Quick question (I am probably overlooking something here) - if you simply >> want to emit each element (Trigger onElement) in addition to a special data >> stream, can you not simply have two operators that consume the original >> data stream: The window operator and the additional source. >> >> If you need each element to pass through the window function anyways, I >> think it may almost be easier to use the custom state with timeout example >> I sent you a while back. There you have fill flexibility and need not >> separate between trigger state, window state, etc... >> >> Stephan >> >> >> >> On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi Niels, >>> I’m afraid this will not work. (If I understood correctly what you are >>> trying to do.) When the trigger is being serialized/deserialized each >>> parallel instance of the trigger has their own copy of the QueueSource >>> object. Plus, a separate instance of the QueueSource itself will be running >>> in each parallel instance of the source operator. And there is no way for >>> there being communication between the trigger and source, since they might >>> now even run on the same machine in the end. >>> >>> Cheers, >>> Aljoscha >>> > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote: >>> > >>> > Hi, >>> > >>> > Just to let you know: I tried passing a SourceFunction but I haven't >>> been able to get that to work (yet). >>> > >>> > I passed an instance of this (see code below) into my Trigger and >>> stored it there as: >>> > private QueueSource output; >>> > and then I called from the onElement something like: >>> > output.put("Foo",1234); >>> > >>> > When I run this from my IDE I get two distinct instances of the queue >>> (effect: the stuff I put in doesn't come out at the other end). >>> > >>> > Any pointers how (and if) this can be fixed are welcome. >>> > Only if this works will I look into making this a generic (I got some >>> type related exceptions when I tried that). >>> > >>> > Niels >>> > >>> > >>> > (Below has Apache 2.0 License; so copy adapt and improve if you want >>> to) >>> > package nl.basjes.flink.experiments; >>> > >>> > import org.apache.flink.configuration.Configuration; >>> > import >>> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction; >>> > >>> > import java.util.concurrent.ConcurrentLinkedQueue; >>> > >>> > public class QueueSource extends RichEventTimeSourceFunction<String> { >>> > private static final long serialVersionUID = 1L; >>> > >>> > private volatile boolean isRunning = true; >>> > >>> > private ConcurrentLinkedQueue<QueueElement> queue = new >>> ConcurrentLinkedQueue<>(); >>> > >>> > @Override >>> > public void open(Configuration parameters) throws Exception { >>> > super.open(parameters); >>> > } >>> > >>> > @Override >>> > public void close() throws Exception { >>> > super.close(); >>> > } >>> > >>> > @Override >>> > public void run(SourceContext<String> ctx) throws Exception { >>> > this.isRunning = true; >>> > >>> > while (this.isRunning) { >>> > if (queue.isEmpty()) { >>> > Thread.sleep(1); // Sleep 1 ms before retrying to >>> dequeue again >>> > continue; >>> > } >>> > QueueElement queueElement = queue.poll(); >>> > ctx.collectWithTimestamp(queueElement.element, >>> queueElement.timestamp); >>> > } >>> > } >>> > >>> > public void cancel() { >>> > this.isRunning = false; >>> > } >>> > >>> > public void put(String element, long timestamp) { >>> > QueueElement queueElement = new QueueElement(); >>> > queueElement.element = element; >>> > queueElement.timestamp = timestamp; >>> > queue.add(queueElement); >>> > } >>> > } >>> > >>> > class QueueElement { >>> > String element; >>> > long timestamp; >>> > } >>> > >>> > >>> > >>> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> >>> wrote: >>> > Thanks. >>> > >>> > The way I solved it now is by creating a class that persists data into >>> something external (right now HBase and/or Kafka) and use that from the >>> trigger to output the data. >>> > >>> > I have two followup questions: >>> > 1) Is it possible to pass an instance of 'SourceFunction' as such a >>> parameter (without breaking Flink)? >>> > 2) I want to save resources so I'm using a single instance of my >>> 'Extra data output class' in the instance of the Trigger. Thus reusing the >>> connections to the outside over multiple Window instances. Can I assume >>> that a single instance of Trigger will only be used by a single thread? >>> I.e. Can I assume that I do not need locking and synchronization? >>> > >>> > Niels >>> > >>> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> >>> wrote: >>> > Hi Niels! >>> > >>> > I think there is no clean way to emit data from a trigger right now, >>> you can only emit data from the window functions. >>> > >>> > You can emit two different kind of data types using an "Either" type. >>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT: >>> > >>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java >>> > >>> > Maybe being able to emit different type of elements helps your use >>> case... >>> > >>> > >>> > These types of questions have been coming up quite a bit, people >>> looking to do different actions inside the windows on different triggers >>> (on element, on event time). >>> > >>> > As per discussion with Aljoscha, one way to make this more flexible is >>> to enhance what you can do with custom state: >>> > - State has timeouts (for cleanup) >>> > - Functions allow you to schedule event-time progress notifications >>> > >>> > Stephan >>> > >>> > >>> > >>> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> >>> wrote: >>> > Hi, >>> > >>> > I'm working on something that uses the Flink Window feature. >>> > I have written a custom Trigger to build the Window I need. >>> > >>> > I am using the Window feature because I need state and I need to >>> expire (and clean) this state after a timeout (I use the onEventTime to do >>> that). >>> > Because I need the data streaming in real time (augmented with the >>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the >>> window I need the fact of this purge (and some of the stats of this Window) >>> as a separate event in a separate 'DataStream'. >>> > >>> > Now the interfaces of the various classes only support output as a >>> single java type (very sane choice). >>> > So what I do right now is put my events on something 'external' >>> (HBase/Kafka) and read it in via a different Source implementation. >>> > >>> > My question: Is there a better way to do this? >>> > Can I (for example) create a special 'Source' that I can pass as a >>> parameter to my Trigger and then onEventTime just output a 'new event' ? >>> > >>> > What do you recommend? >>> > >>> > -- >>> > Best regards / Met vriendelijke groeten, >>> > >>> > Niels Basjes >>> > >>> > >>> > >>> > >>> > -- >>> > Best regards / Met vriendelijke groeten, >>> > >>> > Niels Basjes >>> > >>> > >>> > >>> > -- >>> > Best regards / Met vriendelijke groeten, >>> > >>> > Niels Basjes >>> >>> >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >