Joshua Grisham created KAFKA-10627:
--------------------------------------

             Summary: Connect TimestampConverter transform does not support 
multiple formats for the same field and only allows one field to be transformed 
at a time
                 Key: KAFKA-10627
                 URL: https://issues.apache.org/jira/browse/KAFKA-10627
             Project: Kafka
          Issue Type: Improvement
          Components: KafkaConnect
            Reporter: Joshua Grisham


Some of the limitations of the *TimestampConverter* transform are causing 
issues for us since we have a lot of different producers from different systems 
producing events to some of our topics.  We try our best to have governance on 
the data formats including strict usage of Avro schemas but there are still 
variations in timestamp data types that are allowed by the schema.

In the end there will be multiple formats coming into the same timestamp fields 
(for example, with and without milliseconds, with and without a timezone 
specifier, etc).

And then you get failed events in Connect with messages like this:
{noformat}
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler
        at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
        at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at 
org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
        at 
org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
        at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        atrrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 
timestamp: value (2020-10-06T12:12:27h pattern (yyyy-MM-dd'T'HH:mm:ss.SSSX)
        at 
org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
        at 
org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
        at 
org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
        at 
org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
        at 
org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
        at 
.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
        at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 14 more
Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
        at java.text.DateFormat.parse(DateFormat.java:366)
        at 
org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
        ... 21 more
{noformat}
 

My thinking is that maybe a good solution is to switch from using 
*java.util.Date* to instead using *java.util.Time*, then instead of 
*SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of 
more sophisticated patterns in the config to match multiple different allowable 
formats.

For example instead of effectively doing this:
{code:java}
SimpleDateFormat format = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");{code}
It can be something like this:
{code:java}
DateTimeFormatter format = DateTimeFormatter.ofPattern("[yyyy-MM-dd[['T'][ 
]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]");{code}
Also if there are multiple timestamp fields in the schema/events, then today 
you have to chain multiple *TimestampConverter* transforms together but I can 
see a little bit of a performance impact if there are many timestamps on large 
events and the topic has a lot of events coming through.

So it would be great actually if the field name could instead be a 
comma-separated list of field names (much like you can use with *Cast*, 
*ReplaceField*, etc transforms) and then it will just loop through each field 
in the list and apply the same logic (parse field based on string and give 
requested output type).

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to