You can strip the parts that accumulate the data for writing the files,
then it becomes a very slim example to build on top of...

On Fri, Dec 11, 2015 at 3:20 PM, Niels Basjes <ni...@basjes.nl> wrote:

> I'll have an other look at the example code you sent me.
> Thanks.
>
> On Fri, Dec 11, 2015 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Niels!
>>
>> Quick question (I am probably overlooking something here) - if you simply
>> want to emit each element (Trigger onElement) in addition to a special data
>> stream, can you not simply have two operators that consume the original
>> data stream: The window operator and the additional source.
>>
>> If you need each element to pass through the window function anyways, I
>> think it may almost be easier to use the custom state with timeout example
>> I sent you a while back. There you have fill flexibility and need not
>> separate between trigger state, window state, etc...
>>
>> Stephan
>>
>>
>>
>> On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi Niels,
>>> I’m afraid this will not work. (If I understood correctly what you are
>>> trying to do.) When the trigger is being serialized/deserialized each
>>> parallel instance of the trigger has their own copy of the QueueSource
>>> object. Plus, a separate instance of the QueueSource itself will be running
>>> in each parallel instance of the source operator. And there is no way for
>>> there being communication between the trigger and source, since they might
>>> now even run on the same machine in the end.
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Just to let you know: I tried passing a SourceFunction but I haven't
>>> been able to get that to work (yet).
>>> >
>>> > I passed an instance of this (see code below) into my Trigger and
>>> stored it there as:
>>> >     private QueueSource output;
>>> > and then I called from the onElement something like:
>>> >    output.put("Foo",1234);
>>> >
>>> > When I run this from my IDE I get two distinct instances of the queue
>>> (effect: the stuff I put in doesn't come out at the other end).
>>> >
>>> > Any pointers how (and if) this can be fixed are welcome.
>>> > Only if this works will I look into making this a generic (I got some
>>> type related exceptions when I tried that).
>>> >
>>> > Niels
>>> >
>>> >
>>> > (Below has Apache 2.0 License; so copy adapt and improve if you want
>>> to)
>>> > package nl.basjes.flink.experiments;
>>> >
>>> > import org.apache.flink.configuration.Configuration;
>>> > import
>>> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
>>> >
>>> > import java.util.concurrent.ConcurrentLinkedQueue;
>>> >
>>> > public class QueueSource extends RichEventTimeSourceFunction<String> {
>>> >     private static final long serialVersionUID = 1L;
>>> >
>>> >     private volatile boolean isRunning = true;
>>> >
>>> >     private ConcurrentLinkedQueue<QueueElement> queue = new
>>> ConcurrentLinkedQueue<>();
>>> >
>>> >     @Override
>>> >     public void open(Configuration parameters) throws Exception {
>>> >         super.open(parameters);
>>> >     }
>>> >
>>> >     @Override
>>> >     public void close() throws Exception {
>>> >         super.close();
>>> >     }
>>> >
>>> >     @Override
>>> >     public void run(SourceContext<String> ctx) throws Exception {
>>> >         this.isRunning = true;
>>> >
>>> >         while (this.isRunning) {
>>> >             if (queue.isEmpty()) {
>>> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
>>> dequeue again
>>> >                 continue;
>>> >             }
>>> >             QueueElement queueElement = queue.poll();
>>> >             ctx.collectWithTimestamp(queueElement.element,
>>> queueElement.timestamp);
>>> >         }
>>> >     }
>>> >
>>> >     public void cancel() {
>>> >         this.isRunning = false;
>>> >     }
>>> >
>>> >     public void put(String element, long timestamp) {
>>> >         QueueElement queueElement = new QueueElement();
>>> >         queueElement.element = element;
>>> >         queueElement.timestamp = timestamp;
>>> >         queue.add(queueElement);
>>> >     }
>>> > }
>>> >
>>> > class QueueElement {
>>> >     String element;
>>> >     long timestamp;
>>> > }
>>> >
>>> >
>>> >
>>> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl>
>>> wrote:
>>> > Thanks.
>>> >
>>> > The way I solved it now is by creating a class that persists data into
>>> something external (right now HBase and/or Kafka) and use that from the
>>> trigger to output the data.
>>> >
>>> > I have two followup questions:
>>> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
>>> parameter (without breaking Flink)?
>>> > 2) I want to save resources so I'm using a single instance of my
>>> 'Extra data output class' in the instance of the Trigger. Thus reusing the
>>> connections to the outside over multiple Window instances. Can I assume
>>> that a single instance of Trigger will only be used by a single thread?
>>> I.e. Can I assume that I do not need locking and synchronization?
>>> >
>>> > Niels
>>> >
>>> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org>
>>> wrote:
>>> > Hi Niels!
>>> >
>>> > I think there is no clean way to emit data from a trigger right now,
>>> you can only emit data from the window functions.
>>> >
>>> > You can emit two different kind of data types using an "Either" type.
>>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>>> >
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>>> >
>>> > Maybe being able to emit different type of elements helps your use
>>> case...
>>> >
>>> >
>>> > These types of questions have been coming up quite a bit, people
>>> looking to do different actions inside the windows on different triggers
>>> (on element, on event time).
>>> >
>>> > As per discussion with Aljoscha, one way to make this more flexible is
>>> to enhance what you can do with custom state:
>>> >   - State has timeouts (for cleanup)
>>> >   - Functions allow you to schedule event-time progress notifications
>>> >
>>> > Stephan
>>> >
>>> >
>>> >
>>> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl>
>>> wrote:
>>> > Hi,
>>> >
>>> > I'm working on something that uses the Flink Window feature.
>>> > I have written a custom Trigger to build the Window I need.
>>> >
>>> > I am using the Window feature because I need state and I need to
>>> expire (and clean) this state after a timeout (I use the onEventTime to do
>>> that).
>>> > Because I need the data streaming in real time (augmented with the
>>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>>> window I need the fact of this purge (and some of the stats of this Window)
>>> as a separate event in a separate 'DataStream'.
>>> >
>>> > Now the interfaces of the various classes only support output as a
>>> single java type (very sane choice).
>>> > So what I do right now is put my events on something 'external'
>>> (HBase/Kafka) and read it in via a different Source implementation.
>>> >
>>> > My question: Is there a better way to do this?
>>> > Can I (for example) create a special 'Source' that I can pass as a
>>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>>> >
>>> > What do you recommend?
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>> >
>>> >
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>>
>>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Reply via email to