Hi,
the code seems alright? Did you try looking at the Flink Dashboard to check
out whether any of the operations are sending elements?

Cheers,
Aljoscha

On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala <kana...@hotmail.com> wrote:

> This worked when I ran my test code locally, but I'm seeing nothing reach
> my sink when I try to run this in YARN (previously, when I just echo'ed all
> sums to my sink, it would work).
>
> Here's what my code looks like:
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new
> FlinkKafkaConsumer09<>(
>                 INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
>         env.enableCheckpointing(5000);
>
>         // this (or event time) is required in order to do the
> double-windowing below
>         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
>         DataStream<String> stream = env
>                 .addSource(consumer)
>                 .flatMap(new CountRequests())
>                 .keyBy(0, 1)
>                 .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
> TimeUnit.SECONDS))
>                 .sum(2)
>                 .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
>                 .apply(new TopK(20))
>                 .map(new ToString<List<Tuple3<String, String,
> Integer>>>());
>         stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new
> SimpleStringSchema(),
>                 properties));
>         env.execute(TASK_NAME);
>
> Note that CountRequests produces Tuple3<String, String, Integer>, TopK is
> an AllWindowFunction that produces List<Tuple3<String, String, Integer>>,
> and ToString is a MapFunction that is just a wrapper on Object#toString().
>
> Anything obvious that I'm doing wrong?
> ________________________________
> > From: aljos...@apache.org
> > Date: Fri, 1 Apr 2016 09:41:12 +0000
> > Subject: Re: Multiple operations on a WindowedStream
> > To: user@flink.apache.org
> >
> > Hi,
> > if you are using ingestion-time (or event-time) as your stream time
> > characteristic, i.e.:
> >
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or
> > TimeCharacteristic.EventTime
> >
> > you can apply several window transforms after another and they will
> > apply the same "time window" because they work on the element
> > timestamps. What you can then do is have a window that does the
> > aggregation and then another one (that has to be global) to select the
> > top elements:
> >
> > result = input
> > .keyBy(<some key>)
> > .timeWindow(Time.minutes(1), Time.seconds(5))
> > .sum(2)
> > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding
> > window here, because the above will output a new window every 5 seconds
> > .apply(<my custom window function>)
> >
> > I hope this helps.
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan
> > <balaji.rajagopa...@olacabs.com<mailto:balaji.rajagopa...@olacabs.com>>
> > wrote:
> > I had a similar use case and ended writing the aggregation logic in the
> > apply function, could not find any better solution.
> >
> > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala
> > <kana...@hotmail.com<mailto:kana...@hotmail.com>> wrote:
> > Hi,
> >
> > I would like to write something that does something like a word count,
> > and then emits only the 10 highest counts for that window. Logically, I
> > would want to do something like:
> >
> > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
> > TimeUnit.SECONDS)).sum(2).apply(getTopK(10))
> >
> > However, the window context is lost after I do the sum aggregation. Is
> > there a straightforward way to express this logic in Flink 1.0? One way
> > I can think of is to have a complex function in apply() that has state,
> > but I would like to know if there is something a little cleaner than
> > that.
> >
> > Thanks,
> > Kanak
> >
>

Reply via email to