twalthr commented on a change in pull request #13373: URL: https://github.com/apache/flink/pull/13373#discussion_r487046691
########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ########## @@ -340,10 +357,22 @@ private Time convertToTime(Object object, @Nullable JodaConverter jodaConverter) return new Time(millis - LOCAL_TZ.getOffset(millis)); } - private Timestamp convertToTimestamp(Object object) { + private Timestamp convertToTimestamp(Object object, @Nullable JodaConverter jodaConverter, boolean isMicros) { final long millis; if (object instanceof Long) { - millis = (Long) object; + if (isMicros) { + long micros = (Long) object; + Instant instant = Instant.ofEpochSecond(0) Review comment: maybe to much optimization but should we just do the long arithmetic ourselves here and below? We are creating a lot of objects for the hot path. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) { return DataTypes.TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) .notNull(); - } - if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6) .bridgedTo(java.sql.Timestamp.class) .notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6) + .bridgedTo(LocalTime.class) Review comment: Shouldn't this be a SQL type like the others? Is this method actually used? ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java ########## @@ -193,7 +193,7 @@ private void validateUserSchema(DataType actual) { DataTypes.FIELD("type_bytes", DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class)).notNull()), DataTypes.FIELD("type_date", DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull()), DataTypes.FIELD("type_time_millis", DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull()), - DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()), + DataTypes.FIELD("type_time_micros", DataTypes.TIME(6).notNull()), Review comment: same question as above why is this not `sql.Time`? ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: But it would still be nice to test all types for Avro but maybe to the new Blink planner. Could we add least have a separate commit for this class? Then we can revert the changes because after FLIP-136 this test should definitely work again, no? Or does it work already today? ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Thanks for the explanation. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org