Hi Niels!

Quick question (I am probably overlooking something here) - if you simply
want to emit each element (Trigger onElement) in addition to a special data
stream, can you not simply have two operators that consume the original
data stream: The window operator and the additional source.

If you need each element to pass through the window function anyways, I
think it may almost be easier to use the custom state with timeout example
I sent you a while back. There you have fill flexibility and need not
separate between trigger state, window state, etc...

Stephan



On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Niels,
> I’m afraid this will not work. (If I understood correctly what you are
> trying to do.) When the trigger is being serialized/deserialized each
> parallel instance of the trigger has their own copy of the QueueSource
> object. Plus, a separate instance of the QueueSource itself will be running
> in each parallel instance of the source operator. And there is no way for
> there being communication between the trigger and source, since they might
> now even run on the same machine in the end.
>
> Cheers,
> Aljoscha
> > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote:
> >
> > Hi,
> >
> > Just to let you know: I tried passing a SourceFunction but I haven't
> been able to get that to work (yet).
> >
> > I passed an instance of this (see code below) into my Trigger and stored
> it there as:
> >     private QueueSource output;
> > and then I called from the onElement something like:
> >    output.put("Foo",1234);
> >
> > When I run this from my IDE I get two distinct instances of the queue
> (effect: the stuff I put in doesn't come out at the other end).
> >
> > Any pointers how (and if) this can be fixed are welcome.
> > Only if this works will I look into making this a generic (I got some
> type related exceptions when I tried that).
> >
> > Niels
> >
> >
> > (Below has Apache 2.0 License; so copy adapt and improve if you want to)
> > package nl.basjes.flink.experiments;
> >
> > import org.apache.flink.configuration.Configuration;
> > import
> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
> >
> > import java.util.concurrent.ConcurrentLinkedQueue;
> >
> > public class QueueSource extends RichEventTimeSourceFunction<String> {
> >     private static final long serialVersionUID = 1L;
> >
> >     private volatile boolean isRunning = true;
> >
> >     private ConcurrentLinkedQueue<QueueElement> queue = new
> ConcurrentLinkedQueue<>();
> >
> >     @Override
> >     public void open(Configuration parameters) throws Exception {
> >         super.open(parameters);
> >     }
> >
> >     @Override
> >     public void close() throws Exception {
> >         super.close();
> >     }
> >
> >     @Override
> >     public void run(SourceContext<String> ctx) throws Exception {
> >         this.isRunning = true;
> >
> >         while (this.isRunning) {
> >             if (queue.isEmpty()) {
> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
> dequeue again
> >                 continue;
> >             }
> >             QueueElement queueElement = queue.poll();
> >             ctx.collectWithTimestamp(queueElement.element,
> queueElement.timestamp);
> >         }
> >     }
> >
> >     public void cancel() {
> >         this.isRunning = false;
> >     }
> >
> >     public void put(String element, long timestamp) {
> >         QueueElement queueElement = new QueueElement();
> >         queueElement.element = element;
> >         queueElement.timestamp = timestamp;
> >         queue.add(queueElement);
> >     }
> > }
> >
> > class QueueElement {
> >     String element;
> >     long timestamp;
> > }
> >
> >
> >
> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> wrote:
> > Thanks.
> >
> > The way I solved it now is by creating a class that persists data into
> something external (right now HBase and/or Kafka) and use that from the
> trigger to output the data.
> >
> > I have two followup questions:
> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
> parameter (without breaking Flink)?
> > 2) I want to save resources so I'm using a single instance of my 'Extra
> data output class' in the instance of the Trigger. Thus reusing the
> connections to the outside over multiple Window instances. Can I assume
> that a single instance of Trigger will only be used by a single thread?
> I.e. Can I assume that I do not need locking and synchronization?
> >
> > Niels
> >
> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
> > Hi Niels!
> >
> > I think there is no clean way to emit data from a trigger right now, you
> can only emit data from the window functions.
> >
> > You can emit two different kind of data types using an "Either" type.
> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
> >
> > Maybe being able to emit different type of elements helps your use
> case...
> >
> >
> > These types of questions have been coming up quite a bit, people looking
> to do different actions inside the windows on different triggers (on
> element, on event time).
> >
> > As per discussion with Aljoscha, one way to make this more flexible is
> to enhance what you can do with custom state:
> >   - State has timeouts (for cleanup)
> >   - Functions allow you to schedule event-time progress notifications
> >
> > Stephan
> >
> >
> >
> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> wrote:
> > Hi,
> >
> > I'm working on something that uses the Flink Window feature.
> > I have written a custom Trigger to build the Window I need.
> >
> > I am using the Window feature because I need state and I need to expire
> (and clean) this state after a timeout (I use the onEventTime to do that).
> > Because I need the data streaming in real time (augmented with the
> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
> window I need the fact of this purge (and some of the stats of this Window)
> as a separate event in a separate 'DataStream'.
> >
> > Now the interfaces of the various classes only support output as a
> single java type (very sane choice).
> > So what I do right now is put my events on something 'external'
> (HBase/Kafka) and read it in via a different Source implementation.
> >
> > My question: Is there a better way to do this?
> > Can I (for example) create a special 'Source' that I can pass as a
> parameter to my Trigger and then onEventTime just output a 'new event' ?
> >
> > What do you recommend?
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
>

Reply via email to