Hi Till,

I’m using Flink 1.11.2 version.

Yes, FlinkKafkaProducer011.setWriteTimestampToKafka(true) was set and causing 
the issue.

Thank you for your help!


Regards,
Priyanka

From: Till Rohrmann <trohrm...@apache.org>
Sent: Tuesday, January 12, 2021 3:10 PM
To: Taher Koitawala <taher...@gmail.com>
Cc: Priyanka Kalra A <priyanka.a.ka...@ericsson.com>; user 
<user@flink.apache.org>
Subject: Re: Timestamp Issue with OutputTags

Hi Priyanka,

Could you tell us which Flink version you are using? Moreover, seeing the 
complete Flink job could be helpful. The only explanation I have at the moment 
is that you might have set 
FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then you 
have to set the TimeCharacteristic to EventTime because 
setWriteTimestampToKafka only works with event time.

Cheers,
Till

On Tue, Jan 12, 2021 at 7:48 AM Taher Koitawala 
<taher...@gmail.com<mailto:taher...@gmail.com>> wrote:
Hi Priyanka,
      I see that your are generating dynamic output tags. AFAIK, dynamic 
tagging is causing that issue. I don't think we can add tags after operators 
are running.

Can you try with a static named tag which is defined final. And output data 
that way.

Added Till

On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A 
<priyanka.a.ka...@ericsson.com<mailto:priyanka.a.ka...@ericsson.com>> wrote:
Below is the code:

public class OutputTagProcessingFunction extends ProcessFunction<GenericRecord, 
GenericRecord>
{
    private static final long serialVersionUID = 1L;
    private HashMap<String, OutputTag<GenericRecord>> outputMap = new 
HashMap<>();
    private List<String> tagList;

    public OutputTagProcessingFunction(List<String> tagList) {
        super();
        this.tagList = tagList;
    }

    @Override
    public void processElement(final GenericRecord value, Context ctx, 
Collector<GenericRecord> out) throws Exception {
        Set<String> tagSet = new HashSet<>();
        for (String tag : tagList) {
            List<String> tags = Arrays.asList(tag.split(","));
            tagSet.addAll(tags);
        }

        for (String tag : tagSet) {
            outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) {});
            ctx.output(outputMap.get(tag), value);
        }
    }
}

Exception comes at highlighted line.


Regards,
Priyanka
From: Taher Koitawala <taher...@gmail.com<mailto:taher...@gmail.com>>
Sent: Monday, January 11, 2021 6:50 PM
To: Priyanka Kalra A 
<priyanka.a.ka...@ericsson.com<mailto:priyanka.a.ka...@ericsson.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>; Sushil Kumar 
Singh B 
<sushil.kumar.b.si...@ericsson.com<mailto:sushil.kumar.b.si...@ericsson.com>>; 
Anuj Kumar Jain A 
<anuj.kumar.a.j...@ericsson.com<mailto:anuj.kumar.a.j...@ericsson.com>>; Chirag 
Dewan <chirag.de...@ericsson.com<mailto:chirag.de...@ericsson.com>>; Pankaj 
Kumar Aggarwal 
<pankaj.kumar.aggar...@ericsson.com<mailto:pankaj.kumar.aggar...@ericsson.com>>
Subject: Re: Timestamp Issue with OutputTags

Can you please share your code?

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A 
<priyanka.a.ka...@ericsson.com<mailto:priyanka.a.ka...@ericsson.com>> wrote:
Hi Team,

We are generating multiple side-output tags and using default processing time 
on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and 
implementation is provided for processElement method. Upon sending valid data, 
it gives error "Invalid timestamp: -9223372036854775808. Timestamp should 
always be non-negative or null".


  *   Why is it not able to read timestamp?
  *   Why is not taking system default time as processing time?

Complete stack trace for reference:
java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. 
Timestamp should always be non-negative or null.
                at 
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) 
~[kafka-clients-0.11.0.2.jar:?]
                at 
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) 
~[kafka-clients-0.11.0.2.jar:?]
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
 ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
 ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)
                at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
 ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
 ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765)
 ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757)
 ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
                at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]


Your help with this would be deeply appreciated!


Thanks & Regards,
Priyanka Kalra

Reply via email to