Hi, the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements?
Cheers, Aljoscha On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala <kana...@hotmail.com> wrote: > This worked when I ran my test code locally, but I'm seeing nothing reach > my sink when I try to run this in YARN (previously, when I just echo'ed all > sums to my sink, it would work). > > Here's what my code looks like: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new > FlinkKafkaConsumer09<>( > INPUT_TOPIC, new KafkaMessageDeserializer(), properties); > env.enableCheckpointing(5000); > > // this (or event time) is required in order to do the > double-windowing below > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > DataStream<String> stream = env > .addSource(consumer) > .flatMap(new CountRequests()) > .keyBy(0, 1) > .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > TimeUnit.SECONDS)) > .sum(2) > .timeWindowAll(Time.of(5, TimeUnit.SECONDS)) > .apply(new TopK(20)) > .map(new ToString<List<Tuple3<String, String, > Integer>>>()); > stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new > SimpleStringSchema(), > properties)); > env.execute(TASK_NAME); > > Note that CountRequests produces Tuple3<String, String, Integer>, TopK is > an AllWindowFunction that produces List<Tuple3<String, String, Integer>>, > and ToString is a MapFunction that is just a wrapper on Object#toString(). > > Anything obvious that I'm doing wrong? > ________________________________ > > From: aljos...@apache.org > > Date: Fri, 1 Apr 2016 09:41:12 +0000 > > Subject: Re: Multiple operations on a WindowedStream > > To: user@flink.apache.org > > > > Hi, > > if you are using ingestion-time (or event-time) as your stream time > > characteristic, i.e.: > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > > TimeCharacteristic.EventTime > > > > you can apply several window transforms after another and they will > > apply the same "time window" because they work on the element > > timestamps. What you can then do is have a window that does the > > aggregation and then another one (that has to be global) to select the > > top elements: > > > > result = input > > .keyBy(<some key>) > > .timeWindow(Time.minutes(1), Time.seconds(5)) > > .sum(2) > > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > > window here, because the above will output a new window every 5 seconds > > .apply(<my custom window function>) > > > > I hope this helps. > > > > Cheers, > > Aljoscha > > > > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan > > <balaji.rajagopa...@olacabs.com<mailto:balaji.rajagopa...@olacabs.com>> > > wrote: > > I had a similar use case and ended writing the aggregation logic in the > > apply function, could not find any better solution. > > > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > > <kana...@hotmail.com<mailto:kana...@hotmail.com>> wrote: > > Hi, > > > > I would like to write something that does something like a word count, > > and then emits only the 10 highest counts for that window. Logically, I > > would want to do something like: > > > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > > > However, the window context is lost after I do the sum aggregation. Is > > there a straightforward way to express this logic in Flink 1.0? One way > > I can think of is to have a complex function in apply() that has state, > > but I would like to know if there is something a little cleaner than > > that. > > > > Thanks, > > Kanak > > >