Hi Martijn,

Do you know what could be causing this issue given our Flink version? Is
this possibly a bug with that version?

Thanks,
Tom

On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <thom...@yelp.com> wrote:

> Hi Martijn,
>
> We are using 1.11.6.
>
> Thank you for the help.
>
> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Tom,
>>
>> Which version of Flink are you using?
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>>
>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <thom...@yelp.com> wrote:
>>
>>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>> kafka source is converted into a StreamTableSource. We are using the
>>> new Blink table planner to execute SQL on the table stream. The output is
>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>> a column that has an avro logicalType of date, we get this error (link to 
>>> full
>>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>>
>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast 
>>> to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 
>>> 'platform'; java.time.LocalDate is in module java.base of loader 
>>> 'bootstrap')
>>>         at 
>>> org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>>         at 
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>>         at 
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>         at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>         at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>         at 
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>>         at 
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>>         at 
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>>         at 
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>>
>>> The avro schema definition for a date field is as follows:
>>>
>>>             {
>>>                 "name": "date",
>>>                 "type": {
>>>                     "type": "int",
>>>                     "logicalType": "date"
>>>                 },
>>>                 "doc": "date"
>>>             },
>>>
>>> Any query that selects a date column would produce the error (and any query 
>>> without a column with type date will work). Example of a query that causes 
>>> the error:
>>>
>>> select `date` from table1
>>>
>>> As suggested in the docs, I also tried this with parent-first loading and 
>>> got the same error. When we run the same job without the Blink table 
>>> planner, i.e., useOldPlanner(), we do not get this error. Is this a bug 
>>> with Flink? Or is there something we can change in the application code to 
>>> prevent this error? Any help/suggestions would be appreciated.
>>>
>>>

Reply via email to