Hi Martijn,

Thank you for following up on this. We ended up changing two parts:

When creating the DataType we instead used

new AtomicDataType(new DateType(false), java.sql.Date.class);

So we could override the conversion class for the constructor
<https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/types/AtomicDataType.html#AtomicDataType-org.apache.flink.table.types.logical.LogicalType-java.lang.Class->.
We also changed the logic when converting an avro schema to type
information and used the following in place of the default of
LocalTimeTypeInfo[java.time.LocalDate]

org.apache.flink.table.api.Types.SQL_DATE()

Thank you for the help. We do want to upgrade versions and would likely
help. For now we have this workaround with the current version.

On Fri, Apr 1, 2022 at 4:28 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Tom,
>
> Sorry for the late reply, I missed this. In the upcoming Flink 1.15 a
> number of improvements on CAST will be included [1] Would you be able to
> test this with the current RC0 of Flink 1.15? [2]
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
> [1] https://issues.apache.org/jira/browse/FLINK-24403
> [2] https://lists.apache.org/thread/qpzz298lh5zq5osxmoo0ky6kg0b0r5zg
>
>
> On Tue, 22 Mar 2022 at 18:06, Tom Thornton <thom...@yelp.com> wrote:
>
>> Hi Martijn,
>>
>> Do you know what could be causing this issue given our Flink version? Is
>> this possibly a bug with that version?
>>
>> Thanks,
>> Tom
>>
>> On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <thom...@yelp.com> wrote:
>>
>>> Hi Martijn,
>>>
>>> We are using 1.11.6.
>>>
>>> Thank you for the help.
>>>
>>> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <martijnvis...@apache.org>
>>> wrote:
>>>
>>>> Hi Tom,
>>>>
>>>> Which version of Flink are you using?
>>>>
>>>> Best regards,
>>>>
>>>> Martijn Visser
>>>> https://twitter.com/MartijnVisser82
>>>>
>>>>
>>>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <thom...@yelp.com> wrote:
>>>>
>>>>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>>>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>>>> kafka source is converted into a StreamTableSource. We are using the
>>>>> new Blink table planner to execute SQL on the table stream. The output is
>>>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>>>> a column that has an avro logicalType of date, we get this error (link to 
>>>>> full
>>>>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>>>>
>>>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be 
>>>>> cast to class java.time.LocalDate (java.sql.Date is in module java.sql of 
>>>>> loader 'platform'; java.time.LocalDate is in module java.base of loader 
>>>>> 'bootstrap')
>>>>>         at 
>>>>> org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>>>>         at 
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>>>>         at 
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>>>         at 
>>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>>>>         at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>>>
>>>>>
>>>>> The avro schema definition for a date field is as follows:
>>>>>
>>>>>             {
>>>>>                 "name": "date",
>>>>>                 "type": {
>>>>>                     "type": "int",
>>>>>                     "logicalType": "date"
>>>>>                 },
>>>>>                 "doc": "date"
>>>>>             },
>>>>>
>>>>> Any query that selects a date column would produce the error (and any 
>>>>> query without a column with type date will work). Example of a query that 
>>>>> causes the error:
>>>>>
>>>>> select `date` from table1
>>>>>
>>>>> As suggested in the docs, I also tried this with parent-first loading and 
>>>>> got the same error. When we run the same job without the Blink table 
>>>>> planner, i.e., useOldPlanner(), we do not get this error. Is this a bug 
>>>>> with Flink? Or is there something we can change in the application code 
>>>>> to prevent this error? Any help/suggestions would be appreciated.
>>>>>
>>>>>

Reply via email to