dawidwys commented on a change in pull request #13373: URL: https://github.com/apache/flink/pull/13373#discussion_r487027309
########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. After the upgrade to avro 1.10 one of the fields of the `User` class is of type `java.time.Instant`, previously it was joda's `DateTime`. ########## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ########## @@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> flatComparators) { */ public final Object accessField(Field field, Object object) { try { - object = field.get(object); + if (field.isAccessible()) { Review comment: It would. I changed it. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ########## @@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, TypeInformation<?> info, Object ob switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object, jodaConverter); Review comment: Right in this class we can. I was too fast to follow the same scheme as in the AvroRowDataDeserializationSchema were we have runtime converters instead of member methods. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) { return DataTypes.TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) .notNull(); - } - if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6) .bridgedTo(java.sql.Timestamp.class) .notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6) + .bridgedTo(LocalTime.class) Review comment: Actually, those should be the default conversion classes from `java.time`. I changed that. It is not used, at least for now. We might need that later e.g. in schema-registry where we will need to convert schema retrieved from schema-registry to a SQL schema. It was added as a complementary method to `org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)` ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. After the upgrade to avro 1.10 one of the fields of the `User` class is of type `java.time.Instant`, previously it was joda's `DateTime`. ########## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ########## @@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> flatComparators) { */ public final Object accessField(Field field, Object object) { try { - object = field.get(object); + if (field.isAccessible()) { Review comment: It would. I changed it. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ########## @@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, TypeInformation<?> info, Object ob switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object, jodaConverter); Review comment: Right in this class we can. I was too fast to follow the same scheme as in the AvroRowDataDeserializationSchema were we have runtime converters instead of member methods. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) { return DataTypes.TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) .notNull(); - } - if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6) .bridgedTo(java.sql.Timestamp.class) .notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6) + .bridgedTo(LocalTime.class) Review comment: Actually, those should be the default conversion classes from `java.time`. I changed that. It is not used, at least for now. We might need that later e.g. in schema-registry where we will need to convert schema retrieved from schema-registry to a SQL schema. It was added as a complementary method to `org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)` ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. After the upgrade to avro 1.10 one of the fields of the `User` class is of type `java.time.Instant`, previously it was joda's `DateTime`. ########## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ########## @@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> flatComparators) { */ public final Object accessField(Field field, Object object) { try { - object = field.get(object); + if (field.isAccessible()) { Review comment: It would. I changed it. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ########## @@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, TypeInformation<?> info, Object ob switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object, jodaConverter); Review comment: Right in this class we can. I was too fast to follow the same scheme as in the AvroRowDataDeserializationSchema were we have runtime converters instead of member methods. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) { return DataTypes.TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) .notNull(); - } - if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6) .bridgedTo(java.sql.Timestamp.class) .notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6) + .bridgedTo(LocalTime.class) Review comment: Actually, those should be the default conversion classes from `java.time`. I changed that. It is not used, at least for now. We might need that later e.g. in schema-registry where we will need to convert schema retrieved from schema-registry to a SQL schema. It was added as a complementary method to `org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)` ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. After the upgrade to avro 1.10 one of the fields of the `User` class is of type `java.time.Instant`, previously it was joda's `DateTime`. ########## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ########## @@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> flatComparators) { */ public final Object accessField(Field field, Object object) { try { - object = field.get(object); + if (field.isAccessible()) { Review comment: It would. I changed it. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ########## @@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, TypeInformation<?> info, Object ob switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object, jodaConverter); Review comment: Right in this class we can. I was too fast to follow the same scheme as in the AvroRowDataDeserializationSchema were we have runtime converters instead of member methods. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) { return DataTypes.TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) .notNull(); - } - if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6) .bridgedTo(java.sql.Timestamp.class) .notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6) + .bridgedTo(LocalTime.class) Review comment: Actually, those should be the default conversion classes from `java.time`. I changed that. It is not used, at least for now. We might need that later e.g. in schema-registry where we will need to convert schema retrieved from schema-registry to a SQL schema. It was added as a complementary method to `org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)` ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. ########## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ########## @@ -58,7 +55,7 @@ @RunWith(Parameterized.class) public class AvroTypesITCase extends TableProgramsClusterTestBase { - private static final User USER_1 = User.newBuilder() + private static final SimpleUser USER_1 = SimpleUser.newBuilder() Review comment: Had to drop the logical types, because the `BatchTableEnvironment` fails if we have a `java.time.Instant` in the input DataSet. We extract `BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to `TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner. After the upgrade to avro 1.10 one of the fields of the `User` class is of type `java.time.Instant`, previously it was joda's `DateTime`. ########## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ########## @@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> flatComparators) { */ public final Object accessField(Field field, Object object) { try { - object = field.get(object); + if (field.isAccessible()) { Review comment: It would. I changed it. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ########## @@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, TypeInformation<?> info, Object ob switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object, jodaConverter); Review comment: Right in this class we can. I was too fast to follow the same scheme as in the AvroRowDataDeserializationSchema were we have runtime converters instead of member methods. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) { return DataTypes.TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) .notNull(); - } - if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6) .bridgedTo(java.sql.Timestamp.class) .notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6) + .bridgedTo(LocalTime.class) Review comment: Actually, those should be the default conversion classes from `java.time`. I changed that. It is not used, at least for now. We might need that later e.g. in schema-registry where we will need to convert schema retrieved from schema-registry to a SQL schema. It was added as a complementary method to `org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org