Hi Priyanka,

Could you tell us which Flink version you are using? Moreover, seeing the
complete Flink job could be helpful. The only explanation I have at the
moment is that you might have set
FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then
you have to set the TimeCharacteristic to EventTime because
setWriteTimestampToKafka only works with event time.

Cheers,
Till

On Tue, Jan 12, 2021 at 7:48 AM Taher Koitawala <taher...@gmail.com> wrote:

> Hi Priyanka,
>       I see that your are generating dynamic output tags. AFAIK, dynamic
> tagging is causing that issue. I don't think we can add tags after
> operators are running.
>
> Can you try with a static named tag which is defined final. And output
> data that way.
>
> Added Till
>
> On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <
> priyanka.a.ka...@ericsson.com> wrote:
>
>> Below is the code:
>>
>> public class OutputTagProcessingFunction extends
>> ProcessFunction<GenericRecord, GenericRecord>
>>
>> {
>>
>>     private static final long serialVersionUID = 1L;
>>
>>     private HashMap<String, OutputTag<GenericRecord>> outputMap = new
>> HashMap<>();
>>
>>     private List<String> tagList;
>>
>>
>>
>>     public OutputTagProcessingFunction(List<String> tagList) {
>>
>>         super();
>>
>>         this.tagList = tagList;
>>
>>     }
>>
>>
>>
>>     @Override
>>
>>     public void processElement(final GenericRecord value, Context ctx,
>> Collector<GenericRecord> out) throws Exception {
>>
>>         Set<String> tagSet = new HashSet<>();
>>
>>         for (String tag : tagList) {
>>
>>             List<String> tags = Arrays.asList(tag.split(","));
>>
>>             tagSet.addAll(tags);
>>
>>         }
>>
>>
>>
>>         for (String tag : tagSet) {
>>
>>             outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag)
>> {});
>>
>>             ctx.output(outputMap.get(tag), value);
>>
>>         }
>>
>>     }
>>
>> }
>>
>>
>>
>> Exception comes at highlighted line.
>>
>>
>>
>>
>>
>> Regards,
>>
>> Priyanka
>>
>> *From:* Taher Koitawala <taher...@gmail.com>
>> *Sent:* Monday, January 11, 2021 6:50 PM
>> *To:* Priyanka Kalra A <priyanka.a.ka...@ericsson.com>
>> *Cc:* user <user@flink.apache.org>; Sushil Kumar Singh B <
>> sushil.kumar.b.si...@ericsson.com>; Anuj Kumar Jain A <
>> anuj.kumar.a.j...@ericsson.com>; Chirag Dewan <chirag.de...@ericsson.com>;
>> Pankaj Kumar Aggarwal <pankaj.kumar.aggar...@ericsson.com>
>> *Subject:* Re: Timestamp Issue with OutputTags
>>
>>
>>
>> Can you please share your code?
>>
>>
>>
>> On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <
>> priyanka.a.ka...@ericsson.com> wrote:
>>
>> Hi Team,
>>
>>
>>
>> We are generating multiple side-output tags and using default processing
>> time on non-keyed stream. The class XXXX$YYY extends *ProcessFunction*<I,
>> O> and implementation is provided for *processElement* method. Upon
>> sending valid data, it gives error "*Invalid timestamp:
>> -9223372036854775808. Timestamp should always be non-negative or null*".
>>
>>
>>
>>    - Why is it not able to read timestamp?
>>    - Why is not taking system default time as processing time?
>>
>>
>>
>> *Complete stack trace for reference:*
>>
>> java.lang.IllegalArgumentException: Invalid timestamp:
>> -9223372036854775808. Timestamp should always be non-negative or null.
>>
>>                 at
>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70)
>> ~[kafka-clients-0.11.0.2.jar:?]
>>
>>                 at
>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93)
>> ~[kafka-clients-0.11.0.2.jar:?]
>>
>>                 at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
>> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
>> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> *                at
>> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>>
>> *                at
>> com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)*
>>
>> *                at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
>> ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765)
>> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757)
>> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>
>>
>>
>>
>> Your help with this would be deeply appreciated!
>>
>>
>>
>>
>>
>> Thanks & Regards,
>>
>> Priyanka Kalra
>>
>>

Reply via email to