Flink SQL/Table requires to know the field data types explicitly. Maybe you can apply a MapFunction before `toTable` to convert/normalize the data and type.
Best, Jark On Thu, 18 Jun 2020 at 14:12, YI <uuu...@protonmail.com> wrote: > Hi Jark, > > Thank you for your suggestion. My current problem is that there are quite > a few data types. All these data types are defined upstream which I have no > control. > I don't think I can easily change the type information of a specific > field. Can I? Things become nasty when there are so many `java.util.Date` I > need to change. > > The reason I want to use flink table is that it allows me to easily join > several tables. As an alternative, I think I can use stream join operator. > My only complaint is that it become tedious when I want to join more than > once. I think I need to define all the intermediate data types. > > Best, > Yi > > > > ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ > On Thursday, June 18, 2020 12:11 PM, Jark Wu <imj...@gmail.com> wrote: > > Hi YI, > > Flink doesn't have a TypeInformation for `java.util.Date`, but > only SqlTimeTypeInfo.DATE for `java.sql.Date`. > That's why the TypeInformation.of(java.util.Date) is being recognized as a > RAW type. > > To resolve your problem, I think in `TypeInformation.of(..)` you should > use a concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`, > `java.sql.Date`, `java.sql.Time`. > > Best, > Jark > > On Thu, 18 Jun 2020 at 10:32, YI <uuu...@protonmail.com> wrote: > >> Hi all, >> >> I am using flink to process external data. The source format is json, and >> the underlying data types are defined in a external library. >> I generated table schema with `TableSchema.fromTypeInfo` and >> `TypeInformation.of[_]`. From what I read, this method is deprecated. >> But I didn't find any alternatives. Manually tweaking table schema is not >> viable as there are simply too many types. >> >> One of the field in the source type is `java.util.Date`. I tried to >> convert the obtained table to a datastream with Table.toAppendStream. >> When I ran >> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`, >> the following exception occurred. >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Type is not supported: Date >> at >> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350) >> at >> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63) >> at >> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201) >> at >> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) >> at >> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) >> at >> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198) >> at >> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96) >> at >> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131) >> at >> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228) >> at >> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84) >> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) >> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094) >> at >> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268) >> at >> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134) >> at >> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) >> at >> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131) >> at >> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111) >> at >> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91) >> at >> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) >> at >> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130) >> at java.util.Collections$SingletonList.forEach(Collections.java:4824) >> at >> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130) >> at >> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111) >> at >> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46) >> at >> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) >> at >> org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106) >> at >> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390) >> at >> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185) >> at >> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117) >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) >> at scala.collection.Iterator.foreach(Iterator.scala:943) >> at scala.collection.Iterator.foreach$(Iterator.scala:943) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) >> at scala.collection.TraversableLike.map(TraversableLike.scala:273) >> at scala.collection.TraversableLike.map$(TraversableLike.scala:266) >> at scala.collection.AbstractTraversable.map(Traversable.scala:108) >> at >> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) >> at >> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210) >> at >> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107) >> at >> org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101) >> at io.redacted.test.package$.testJoin(package.scala:31) >> at io.redacted.test.package$.process(package.scala:26) >> at io.redacted.DataAggregator$.main(DataAggregator.scala:15) >> at io.redacted.DataAggregator.main(DataAggregator.scala) >> >> >> This exception is thrown even though I didn't select RAW data field >> `_startTime` which is of type `java.util.Date`. I believe this exception is >> undesirable. >> Is there any way to obtain a RAW data from flink tables? If there isn't >> any, how do I circumvent my current issue? Do I need to manually update all >> table schema? >> >> There is a relevant issue in >> http://mail-archives.apache.org/mod_mbox/flink-user/201907.mbox/%3CCA+3UsY2-L1OKTjNBwX2ajG3o6v5M6QS=jbwyybemzlvdm5x...@mail.gmail.com%3E >> , >> Unfortunately, I didn't find a satisfatory solutions. >> >> Cheers, >> Yi >> >> >