Hi,
are you also using the timestamp extractor when you are using 
env.fromCollection().

Could you maybe insert a dummy mapper after the Kafka source that just prints 
the element and forwards it? To see if the elements come with a good timestamp 
from Kafka.

Cheers,
Aljoscha
> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.kn...@tngtech.com> 
> wrote:
> 
> Hi everyone,
> 
> I have the following issue with Flink (0.10) and Kafka.
> 
> I am using a very simple TimestampExtractor like [1], which just
> extracts a millis timestamp from a POJO. In my streaming job, I read in
> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
> 
> stream = env.addSource(new FlinkKafkaConsumer082<
> (parameterTool.getRequired("topic"),
>                new AvroPojoDeserializationSchema(),
> parameterTool.getProperties()))
> 
> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> AutoWatermarkIntervall is 500.
> 
> The problem is, when I do something like:
> 
> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> .sum(..)
> .print()
> 
> env.execute();
> 
> the windows never get triggered.
> 
> If I use ProcessingTime it works.
> If I use env.fromCollection(...) instead of the KafkaSource it works
> with EventTime, too.
> 
> Any ideas what I could be doing wrong are highly appreciated.
> 
> Cheers,
> 
> Konstantin
> 
> [1]:
> 
> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
> 
>    final private long maxDelay;
> 
>    public  PojoTimestampExtractor(long maxDelay) {
>        this.maxDelay = maxDelay;
>    }
> 
>    @Override
>    public long extractTimestamp(Pojo fightEvent, long l) {
>        return pojo.getTime();
>    }
> 
>    @Override
>    public long extractWatermark(Pojo pojo, long l) {
>        return pojo.getTime() - maxDelay;
>    }
> 
>    @Override
>    public long getCurrentWatermark() {
>        return Long.MIN_VALUE;
>    }

Reply via email to