Hi Martijn, Do you know what could be causing this issue given our Flink version? Is this possibly a bug with that version?
Thanks, Tom On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <thom...@yelp.com> wrote: > Hi Martijn, > > We are using 1.11.6. > > Thank you for the help. > > On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Tom, >> >> Which version of Flink are you using? >> >> Best regards, >> >> Martijn Visser >> https://twitter.com/MartijnVisser82 >> >> >> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <thom...@yelp.com> wrote: >> >>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>, >>> I'm hoping to confirm whether or not an error we are seeing is a bug with >>> Flink. We have a job that uses a Kafka source to read Avro records. The >>> kafka source is converted into a StreamTableSource. We are using the >>> new Blink table planner to execute SQL on the table stream. The output is >>> then put in a sink back to kafka as Avro records. Whenever a query selects >>> a column that has an avro logicalType of date, we get this error (link to >>> full >>> stack trace <https://pastebin.com/raw/duQaTAh6>). >>> >>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast >>> to class java.time.LocalDate (java.sql.Date is in module java.sql of loader >>> 'platform'; java.time.LocalDate is in module java.base of loader >>> 'bootstrap') >>> at >>> org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >>> at >>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158) >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191) >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162) >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374) >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) >>> at java.base/java.lang.Thread.run(Thread.java:829) >>> >>> >>> The avro schema definition for a date field is as follows: >>> >>> { >>> "name": "date", >>> "type": { >>> "type": "int", >>> "logicalType": "date" >>> }, >>> "doc": "date" >>> }, >>> >>> Any query that selects a date column would produce the error (and any query >>> without a column with type date will work). Example of a query that causes >>> the error: >>> >>> select `date` from table1 >>> >>> As suggested in the docs, I also tried this with parent-first loading and >>> got the same error. When we run the same job without the Blink table >>> planner, i.e., useOldPlanner(), we do not get this error. Is this a bug >>> with Flink? Or is there something we can change in the application code to >>> prevent this error? Any help/suggestions would be appreciated. >>> >>>