Hi Priyanka, Could you tell us which Flink version you are using? Moreover, seeing the complete Flink job could be helpful. The only explanation I have at the moment is that you might have set FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then you have to set the TimeCharacteristic to EventTime because setWriteTimestampToKafka only works with event time.
Cheers, Till On Tue, Jan 12, 2021 at 7:48 AM Taher Koitawala <taher...@gmail.com> wrote: > Hi Priyanka, > I see that your are generating dynamic output tags. AFAIK, dynamic > tagging is causing that issue. I don't think we can add tags after > operators are running. > > Can you try with a static named tag which is defined final. And output > data that way. > > Added Till > > On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A < > priyanka.a.ka...@ericsson.com> wrote: > >> Below is the code: >> >> public class OutputTagProcessingFunction extends >> ProcessFunction<GenericRecord, GenericRecord> >> >> { >> >> private static final long serialVersionUID = 1L; >> >> private HashMap<String, OutputTag<GenericRecord>> outputMap = new >> HashMap<>(); >> >> private List<String> tagList; >> >> >> >> public OutputTagProcessingFunction(List<String> tagList) { >> >> super(); >> >> this.tagList = tagList; >> >> } >> >> >> >> @Override >> >> public void processElement(final GenericRecord value, Context ctx, >> Collector<GenericRecord> out) throws Exception { >> >> Set<String> tagSet = new HashSet<>(); >> >> for (String tag : tagList) { >> >> List<String> tags = Arrays.asList(tag.split(",")); >> >> tagSet.addAll(tags); >> >> } >> >> >> >> for (String tag : tagSet) { >> >> outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) >> {}); >> >> ctx.output(outputMap.get(tag), value); >> >> } >> >> } >> >> } >> >> >> >> Exception comes at highlighted line. >> >> >> >> >> >> Regards, >> >> Priyanka >> >> *From:* Taher Koitawala <taher...@gmail.com> >> *Sent:* Monday, January 11, 2021 6:50 PM >> *To:* Priyanka Kalra A <priyanka.a.ka...@ericsson.com> >> *Cc:* user <user@flink.apache.org>; Sushil Kumar Singh B < >> sushil.kumar.b.si...@ericsson.com>; Anuj Kumar Jain A < >> anuj.kumar.a.j...@ericsson.com>; Chirag Dewan <chirag.de...@ericsson.com>; >> Pankaj Kumar Aggarwal <pankaj.kumar.aggar...@ericsson.com> >> *Subject:* Re: Timestamp Issue with OutputTags >> >> >> >> Can you please share your code? >> >> >> >> On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A < >> priyanka.a.ka...@ericsson.com> wrote: >> >> Hi Team, >> >> >> >> We are generating multiple side-output tags and using default processing >> time on non-keyed stream. The class XXXX$YYY extends *ProcessFunction*<I, >> O> and implementation is provided for *processElement* method. Upon >> sending valid data, it gives error "*Invalid timestamp: >> -9223372036854775808. Timestamp should always be non-negative or null*". >> >> >> >> - Why is it not able to read timestamp? >> - Why is not taking system default time as processing time? >> >> >> >> *Complete stack trace for reference:* >> >> java.lang.IllegalArgumentException: Invalid timestamp: >> -9223372036854775808. Timestamp should always be non-negative or null. >> >> at >> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) >> ~[kafka-clients-0.11.0.2.jar:?] >> >> at >> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) >> ~[kafka-clients-0.11.0.2.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) >> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) >> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> * at >> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2]* >> >> * at >> com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)* >> >> * at >> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2]* >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) >> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) >> ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) >> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) >> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> >> >> >> >> >> Your help with this would be deeply appreciated! >> >> >> >> >> >> Thanks & Regards, >> >> Priyanka Kalra >> >>