It turns out that the problem is deeper than I originally thought. The flink dashboard reports that 0 records are being consumed, which is quite odd. Is there some issue with the 0.9 consumer on YARN?
From: aljos...@apache.org Date: Thu, 7 Apr 2016 09:56:42 +0000 Subject: Re: Multiple operations on a WindowedStream To: user@flink.apache.org Hi, the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements? Cheers, Aljoscha On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote: This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work). Here's what my code looks like: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>( INPUT_TOPIC, new KafkaMessageDeserializer(), properties); env.enableCheckpointing(5000); // this (or event time) is required in order to do the double-windowing below env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream stream = env .addSource(consumer) .flatMap(new CountRequests()) .keyBy(0, 1) .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)) .sum(2) .timeWindowAll(Time.of(5, TimeUnit.SECONDS)) .apply(new TopK(20)) .map(new ToString<List<Tuple3>>()); stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(), properties)); env.execute(TASK_NAME); Note that CountRequests produces Tuple3, TopK is an AllWindowFunction that produces List<Tuple3>, and ToString is a MapFunction that is just a wrapper on Object#toString(). Anything obvious that I'm doing wrong? ________________________________ > From: aljos...@apache.org > Date: Fri, 1 Apr 2016 09:41:12 +0000 > Subject: Re: Multiple operations on a WindowedStream > To: user@flink.apache.org > > Hi, > if you are using ingestion-time (or event-time) as your stream time > characteristic, i.e.: > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > TimeCharacteristic.EventTime > > you can apply several window transforms after another and they will > apply the same "time window" because they work on the element > timestamps. What you can then do is have a window that does the > aggregation and then another one (that has to be global) to select the > top elements: > > result = input > .keyBy() > .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window here, because the above will output a new window every 5 seconds > .apply() > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan > <balaji.rajagopa...@olacabs.com> > wrote: > I had a similar use case and ended writing the aggregation logic in the > apply function, could not find any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > <kana...@hotmail.com> wrote: > Hi, > > I would like to write something that does something like a word count, > and then emits only the 10 highest counts for that window. Logically, I > would want to do something like: > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window context is lost after I do the sum aggregation. Is > there a straightforward way to express this logic in Flink 1.0? One way > I can think of is to have a complex function in apply() that has state, > but I would like to know if there is something a little cleaner than > that. > > Thanks, > Kanak >