Hi Priyanka,
I see that your are generating dynamic output tags. AFAIK, dynamic
tagging is causing that issue. I don't think we can add tags after
operators are running.
Can you try with a static named tag which is defined final. And output data
that way.
Added Till
On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <
[email protected]> wrote:
> Below is the code:
>
> public class OutputTagProcessingFunction extends
> ProcessFunction<GenericRecord, GenericRecord>
>
> {
>
> private static final long serialVersionUID = 1L;
>
> private HashMap<String, OutputTag<GenericRecord>> outputMap = new
> HashMap<>();
>
> private List<String> tagList;
>
>
>
> public OutputTagProcessingFunction(List<String> tagList) {
>
> super();
>
> this.tagList = tagList;
>
> }
>
>
>
> @Override
>
> public void processElement(final GenericRecord value, Context ctx,
> Collector<GenericRecord> out) throws Exception {
>
> Set<String> tagSet = new HashSet<>();
>
> for (String tag : tagList) {
>
> List<String> tags = Arrays.asList(tag.split(","));
>
> tagSet.addAll(tags);
>
> }
>
>
>
> for (String tag : tagSet) {
>
> outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag)
> {});
>
> ctx.output(outputMap.get(tag), value);
>
> }
>
> }
>
> }
>
>
>
> Exception comes at highlighted line.
>
>
>
>
>
> Regards,
>
> Priyanka
>
> *From:* Taher Koitawala <[email protected]>
> *Sent:* Monday, January 11, 2021 6:50 PM
> *To:* Priyanka Kalra A <[email protected]>
> *Cc:* user <[email protected]>; Sushil Kumar Singh B <
> [email protected]>; Anuj Kumar Jain A <
> [email protected]>; Chirag Dewan <[email protected]>;
> Pankaj Kumar Aggarwal <[email protected]>
> *Subject:* Re: Timestamp Issue with OutputTags
>
>
>
> Can you please share your code?
>
>
>
> On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <
> [email protected]> wrote:
>
> Hi Team,
>
>
>
> We are generating multiple side-output tags and using default processing
> time on non-keyed stream. The class XXXX$YYY extends *ProcessFunction*<I,
> O> and implementation is provided for *processElement* method. Upon
> sending valid data, it gives error "*Invalid timestamp:
> -9223372036854775808. Timestamp should always be non-negative or null*".
>
>
>
> - Why is it not able to read timestamp?
> - Why is not taking system default time as processing time?
>
>
>
> *Complete stack trace for reference:*
>
> java.lang.IllegalArgumentException: Invalid timestamp:
> -9223372036854775808. Timestamp should always be non-negative or null.
>
> at
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70)
> ~[kafka-clients-0.11.0.2.jar:?]
>
> at
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93)
> ~[kafka-clients-0.11.0.2.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> * at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>
> * at
> com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)*
>
> * at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765)
> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757)
> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>
>
>
>
> Your help with this would be deeply appreciated!
>
>
>
>
>
> Thanks & Regards,
>
> Priyanka Kalra
>
>