Hi,

Just to let you know: I tried passing a SourceFunction but I haven't been
able to get that to work (yet).

I passed an instance of this (see code below) into my Trigger and stored it
there as:
    private QueueSource output;
and then I called from the onElement something like:
   output.put("Foo",1234);

When I run this from my IDE I get two distinct instances of the queue
(effect: the stuff I put in doesn't come out at the other end).

Any pointers how (and if) this can be fixed are welcome.
Only if this works will I look into making this a generic (I got some type
related exceptions when I tried that).

Niels


(Below has Apache 2.0 License; so copy adapt and improve if you want to)

package nl.basjes.flink.experiments;

import org.apache.flink.configuration.Configuration;
import 
org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;

import java.util.concurrent.ConcurrentLinkedQueue;

public class QueueSource extends RichEventTimeSourceFunction<String> {
    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    private ConcurrentLinkedQueue<QueueElement> queue = new
ConcurrentLinkedQueue<>();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        this.isRunning = true;

        while (this.isRunning) {
            if (queue.isEmpty()) {
                Thread.sleep(1); // Sleep 1 ms before retrying to dequeue again
                continue;
            }
            QueueElement queueElement = queue.poll();
            ctx.collectWithTimestamp(queueElement.element,
queueElement.timestamp);
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void put(String element, long timestamp) {
        QueueElement queueElement = new QueueElement();
        queueElement.element = element;
        queueElement.timestamp = timestamp;
        queue.add(queueElement);
    }
}

class QueueElement {
    String element;
    long timestamp;
}




On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> wrote:

> Thanks.
>
> The way I solved it now is by creating a class that persists data into
> something external (right now HBase and/or Kafka) and use that from the
> trigger to output the data.
>
> I have two followup questions:
> 1) Is it possible to pass an instance of  'SourceFunction' as such a
> parameter (without breaking Flink)?
> 2) I want to save resources so I'm using a single instance of my 'Extra
> data output class' in the instance of the Trigger. Thus reusing the
> connections to the outside over multiple Window instances. Can I assume
> that a single instance of Trigger will only be used by a single thread?
> I.e. Can I assume that I do not need locking and synchronization?
>
> Niels
>
> On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Niels!
>>
>> I think there is no clean way to emit data from a trigger right now, you
>> can only emit data from the window functions.
>>
>> You can emit two different kind of data types using an "Either" type.
>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>>
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>>
>> Maybe being able to emit different type of elements helps your use case...
>>
>>
>> These types of questions have been coming up quite a bit, people looking
>> to do different actions inside the windows on different triggers (on
>> element, on event time).
>>
>> As per discussion with Aljoscha, one way to make this more flexible is to
>> enhance what you can do with custom state:
>>   - State has timeouts (for cleanup)
>>   - Functions allow you to schedule event-time progress notifications
>>
>> Stephan
>>
>>
>>
>> On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> I'm working on something that uses the Flink Window feature.
>>> I have written a custom Trigger to build the Window I need.
>>>
>>> I am using the Window feature because I need state and I need to expire
>>> (and clean) this state after a timeout (I use the onEventTime to do that).
>>> Because I need the data streaming in real time (augmented with the
>>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>>> window I need the fact of this purge (and some of the stats of this Window)
>>> as a separate event in a separate 'DataStream'.
>>>
>>> Now the interfaces of the various classes only support output as a
>>> single java type (very sane choice).
>>> So what I do right now is put my events on something 'external'
>>> (HBase/Kafka) and read it in via a different Source implementation.
>>>
>>> My question: Is there a better way to do this?
>>> Can I (for example) create a special 'Source' that I can pass as a
>>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>>>
>>> What do you recommend?
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to