It turns out that the problem is deeper than I originally thought. The flink 
dashboard reports that 0 records are being consumed, which is quite odd. Is 
there some issue with the 0.9 consumer on YARN?

From: aljos...@apache.org
Date: Thu, 7 Apr 2016 09:56:42 +0000
Subject: Re: Multiple operations on a WindowedStream
To: user@flink.apache.org

Hi,
the code seems alright? Did you try looking at the Flink Dashboard to check out 
whether any of the operations are sending elements?

Cheers,
Aljoscha

On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala  wrote:
This worked when I ran my test code locally, but I'm seeing nothing reach my 
sink when I try to run this in YARN (previously, when I just echo'ed all sums 
to my sink, it would work).

Here's what my code looks like:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(
                INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
        env.enableCheckpointing(5000);

        // this (or event time) is required in order to do the double-windowing 
below
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream stream = env
                .addSource(consumer)
                .flatMap(new CountRequests())
                .keyBy(0, 1)
                .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
TimeUnit.SECONDS))
                .sum(2)
                .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
                .apply(new TopK(20))
                .map(new ToString<List<Tuple3>>());
        stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new 
SimpleStringSchema(),
                properties));
        env.execute(TASK_NAME);

Note that CountRequests produces Tuple3, TopK is an AllWindowFunction that 
produces List<Tuple3>, and ToString is a MapFunction that is just a wrapper on 
Object#toString().

Anything obvious that I'm doing wrong?
________________________________
> From: aljos...@apache.org
> Date: Fri, 1 Apr 2016 09:41:12 +0000
> Subject: Re: Multiple operations on a WindowedStream
> To: user@flink.apache.org
>
> Hi,
> if you are using ingestion-time (or event-time) as your stream time
> characteristic, i.e.:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or
> TimeCharacteristic.EventTime
>
> you can apply several window transforms after another and they will
> apply the same "time window" because they work on the element
> timestamps. What you can then do is have a window that does the
> aggregation and then another one (that has to be global) to select the
> top elements:
>
> result = input
> .keyBy()
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .sum(2)
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding
> window here, because the above will output a new window every 5 seconds
> .apply()
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan
> <balaji.rajagopa...@olacabs.com>
> wrote:
> I had a similar use case and ended writing the aggregation logic in the
> apply function, could not find any better solution.
>
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala
> <kana...@hotmail.com> wrote:
> Hi,
>
> I would like to write something that does something like a word count,
> and then emits only the 10 highest counts for that window. Logically, I
> would want to do something like:
>
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10))
>
> However, the window context is lost after I do the sum aggregation. Is
> there a straightforward way to express this logic in Flink 1.0? One way
> I can think of is to have a complex function in apply() that has state,
> but I would like to know if there is something a little cleaner than
> that.
>
> Thanks,
> Kanak
>
                                                                                
  

Reply via email to