Hi Till, I’m using Flink 1.11.2 version.
Yes, FlinkKafkaProducer011.setWriteTimestampToKafka(true) was set and causing the issue. Thank you for your help! Regards, Priyanka From: Till Rohrmann <trohrm...@apache.org> Sent: Tuesday, January 12, 2021 3:10 PM To: Taher Koitawala <taher...@gmail.com> Cc: Priyanka Kalra A <priyanka.a.ka...@ericsson.com>; user <user@flink.apache.org> Subject: Re: Timestamp Issue with OutputTags Hi Priyanka, Could you tell us which Flink version you are using? Moreover, seeing the complete Flink job could be helpful. The only explanation I have at the moment is that you might have set FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then you have to set the TimeCharacteristic to EventTime because setWriteTimestampToKafka only works with event time. Cheers, Till On Tue, Jan 12, 2021 at 7:48 AM Taher Koitawala <taher...@gmail.com<mailto:taher...@gmail.com>> wrote: Hi Priyanka, I see that your are generating dynamic output tags. AFAIK, dynamic tagging is causing that issue. I don't think we can add tags after operators are running. Can you try with a static named tag which is defined final. And output data that way. Added Till On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <priyanka.a.ka...@ericsson.com<mailto:priyanka.a.ka...@ericsson.com>> wrote: Below is the code: public class OutputTagProcessingFunction extends ProcessFunction<GenericRecord, GenericRecord> { private static final long serialVersionUID = 1L; private HashMap<String, OutputTag<GenericRecord>> outputMap = new HashMap<>(); private List<String> tagList; public OutputTagProcessingFunction(List<String> tagList) { super(); this.tagList = tagList; } @Override public void processElement(final GenericRecord value, Context ctx, Collector<GenericRecord> out) throws Exception { Set<String> tagSet = new HashSet<>(); for (String tag : tagList) { List<String> tags = Arrays.asList(tag.split(",")); tagSet.addAll(tags); } for (String tag : tagSet) { outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) {}); ctx.output(outputMap.get(tag), value); } } } Exception comes at highlighted line. Regards, Priyanka From: Taher Koitawala <taher...@gmail.com<mailto:taher...@gmail.com>> Sent: Monday, January 11, 2021 6:50 PM To: Priyanka Kalra A <priyanka.a.ka...@ericsson.com<mailto:priyanka.a.ka...@ericsson.com>> Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>; Sushil Kumar Singh B <sushil.kumar.b.si...@ericsson.com<mailto:sushil.kumar.b.si...@ericsson.com>>; Anuj Kumar Jain A <anuj.kumar.a.j...@ericsson.com<mailto:anuj.kumar.a.j...@ericsson.com>>; Chirag Dewan <chirag.de...@ericsson.com<mailto:chirag.de...@ericsson.com>>; Pankaj Kumar Aggarwal <pankaj.kumar.aggar...@ericsson.com<mailto:pankaj.kumar.aggar...@ericsson.com>> Subject: Re: Timestamp Issue with OutputTags Can you please share your code? On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <priyanka.a.ka...@ericsson.com<mailto:priyanka.a.ka...@ericsson.com>> wrote: Hi Team, We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null". * Why is it not able to read timestamp? * Why is not taking system default time as processing time? Complete stack trace for reference: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null. at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?] at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2] Your help with this would be deeply appreciated! Thanks & Regards, Priyanka Kalra