Could this part of the extractor be the problem Aljoscha?

@Override
    public long getCurrentWatermark() {
        return Long.MIN_VALUE;
    }

Gyula

Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015.
nov. 16., H, 10:39):

> Hi Aljoscha,
>
> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>
> The timestamps look good to me. Here an example.
>
> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>
> The order now is
>
> stream
> .map(dummyMapper)
> .assignTimestamps(...)
> .timeWindow(...)
>
> Is there a way to print out the assigned timestamps after
> stream.assignTimestamps(...)?
>
> Cheers,
>
> Konstantin
>
>
> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> > Hi,
> > are you also using the timestamp extractor when you are using
> env.fromCollection().
> >
> > Could you maybe insert a dummy mapper after the Kafka source that just
> prints the element and forwards it? To see if the elements come with a good
> timestamp from Kafka.
> >
> > Cheers,
> > Aljoscha
> >> On 15 Nov 2015, at 22:55, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> >>
> >> Hi everyone,
> >>
> >> I have the following issue with Flink (0.10) and Kafka.
> >>
> >> I am using a very simple TimestampExtractor like [1], which just
> >> extracts a millis timestamp from a POJO. In my streaming job, I read in
> >> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
> >>
> >> stream = env.addSource(new FlinkKafkaConsumer082<
> >> (parameterTool.getRequired("topic"),
> >>                new AvroPojoDeserializationSchema(),
> >> parameterTool.getProperties()))
> >>
> >> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> >> AutoWatermarkIntervall is 500.
> >>
> >> The problem is, when I do something like:
> >>
> >> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> >> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> >> .sum(..)
> >> .print()
> >>
> >> env.execute();
> >>
> >> the windows never get triggered.
> >>
> >> If I use ProcessingTime it works.
> >> If I use env.fromCollection(...) instead of the KafkaSource it works
> >> with EventTime, too.
> >>
> >> Any ideas what I could be doing wrong are highly appreciated.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1]:
> >>
> >> public class PojoTimestampExtractor implements TimestampExtractor<Pojo>
> {
> >>
> >>    final private long maxDelay;
> >>
> >>    public  PojoTimestampExtractor(long maxDelay) {
> >>        this.maxDelay = maxDelay;
> >>    }
> >>
> >>    @Override
> >>    public long extractTimestamp(Pojo fightEvent, long l) {
> >>        return pojo.getTime();
> >>    }
> >>
> >>    @Override
> >>    public long extractWatermark(Pojo pojo, long l) {
> >>        return pojo.getTime() - maxDelay;
> >>    }
> >>
> >>    @Override
> >>    public long getCurrentWatermark() {
> >>        return Long.MIN_VALUE;
> >>    }
> >
> >
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Reply via email to