Hi, the sources don't report records consumed. This is a bit confusing but the records sent/records consumed statistics only talk about Flink-internal sending of records, so a Kafka source would only show sent records.
To really see each operator in isolation you should disable chaining for these tests: env.disableOperatorChaining() Cheers, Aljoscha On Sat, 9 Apr 2016 at 05:12 Kanak Biscuitwala <kana...@hotmail.com> wrote: > It turns out that the problem is deeper than I originally thought. The > flink dashboard reports that 0 records are being consumed, which is quite > odd. Is there some issue with the 0.9 consumer on YARN? From: > aljos...@apache.org Date: Thu, 7 Apr 2016 09:56:42 +0000 Subject: Re: > Multiple operations on a WindowedStream To: user@flink.apache.org Hi, the > code seems alright? Did you try looking at the Flink Dashboard to check out > whether any of the operations are sending elements? Cheers, Aljoscha On > Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote: This worked when I ran > my test code locally, but I'm seeing nothing reach my sink when I try to > run this in YARN (previously, when I just echo'ed all sums to my sink, it > would work). Here's what my code looks like: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>( > INPUT_TOPIC, new KafkaMessageDeserializer(), properties); > env.enableCheckpointing(5000); // this (or event time) is required > in order to do the double-windowing below > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > DataStream stream = env .addSource(consumer) > .flatMap(new CountRequests()) .keyBy(0, 1) > .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)) > .sum(2) .timeWindowAll(Time.of(5, > TimeUnit.SECONDS)) .apply(new TopK(20)) > .map(new ToString>>()); stream.addSink(new > FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(), > properties)); env.execute(TASK_NAME); Note that CountRequests > produces Tuple3, TopK is an AllWindowFunction that produces List>, and > ToString is a MapFunction that is just a wrapper on Object#toString(). > Anything obvious that I'm doing wrong? ________________________________ > > From: aljos...@apache.org > Date: Fri, 1 Apr 2016 09:41:12 +0000 > > Subject: Re: Multiple operations on a WindowedStream > To: > user@flink.apache.org > > Hi, > if you are using ingestion-time (or > event-time) as your stream time > characteristic, i.e.: > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > > TimeCharacteristic.EventTime > > you can apply several window transforms > after another and they will > apply the same "time window" because they > work on the element > timestamps. What you can then do is have a window > that does the > aggregation and then another one (that has to be global) to > select the > top elements: > > result = input > .keyBy() > > .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) > > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window > here, because the above will output a new window every 5 seconds > .apply() > > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at > 10:35 Balaji Rajagopalan > > > wrote: > I had a similar use case and > ended writing the aggregation logic in the > apply function, could not find > any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > > > wrote: > Hi, > > I would like to write something that does something > like a word count, > and then emits only the 10 highest counts for that > window. Logically, I > would want to do something like: > > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window > context is lost after I do the sum aggregation. Is > there a > straightforward way to express this logic in Flink 1.0? One way > I can > think of is to have a complex function in apply() that has state, > but I > would like to know if there is something a little cleaner than > that. > > > Thanks, > Kanak > >