dawidwys commented on a change in pull request #13373:
URL: https://github.com/apache/flink/pull/13373#discussion_r487027309



##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.
   
   After the upgrade to avro 1.10 one of the fields of the `User` class is of 
type `java.time.Instant`, previously it was joda's `DateTime`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       It would. I changed it.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       Right in this class we can. I was too fast to follow the same scheme as 
in the AvroRowDataDeserializationSchema were we have runtime converters instead 
of member methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Actually, those should be the default conversion classes from 
`java.time`. I changed that. It is not used, at least for now. We might need 
that later e.g. in schema-registry where we will need to convert schema 
retrieved from schema-registry to a SQL schema. 
   
   It was added as a  complementary method to 
`org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)`

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.
   
   After the upgrade to avro 1.10 one of the fields of the `User` class is of 
type `java.time.Instant`, previously it was joda's `DateTime`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       It would. I changed it.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       Right in this class we can. I was too fast to follow the same scheme as 
in the AvroRowDataDeserializationSchema were we have runtime converters instead 
of member methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Actually, those should be the default conversion classes from 
`java.time`. I changed that. It is not used, at least for now. We might need 
that later e.g. in schema-registry where we will need to convert schema 
retrieved from schema-registry to a SQL schema. 
   
   It was added as a  complementary method to 
`org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)`

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.
   
   After the upgrade to avro 1.10 one of the fields of the `User` class is of 
type `java.time.Instant`, previously it was joda's `DateTime`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       It would. I changed it.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       Right in this class we can. I was too fast to follow the same scheme as 
in the AvroRowDataDeserializationSchema were we have runtime converters instead 
of member methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Actually, those should be the default conversion classes from 
`java.time`. I changed that. It is not used, at least for now. We might need 
that later e.g. in schema-registry where we will need to convert schema 
retrieved from schema-registry to a SQL schema. 
   
   It was added as a  complementary method to 
`org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)`

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.
   
   After the upgrade to avro 1.10 one of the fields of the `User` class is of 
type `java.time.Instant`, previously it was joda's `DateTime`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       It would. I changed it.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       Right in this class we can. I was too fast to follow the same scheme as 
in the AvroRowDataDeserializationSchema were we have runtime converters instead 
of member methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Actually, those should be the default conversion classes from 
`java.time`. I changed that. It is not used, at least for now. We might need 
that later e.g. in schema-registry where we will need to convert schema 
retrieved from schema-registry to a SQL schema. 
   
   It was added as a  complementary method to 
`org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)`

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Had to drop the logical types, because the `BatchTableEnvironment` fails 
if we have a `java.time.Instant` in the input DataSet. We extract 
`BasicTypeInfo.INSTANT_TYPE_INFO` for it, which is translated to 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` which fails in legacy planner.
   
   After the upgrade to avro 1.10 one of the fields of the `User` class is of 
type `java.time.Instant`, previously it was joda's `DateTime`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       It would. I changed it.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       Right in this class we can. I was too fast to follow the same scheme as 
in the AvroRowDataDeserializationSchema were we have runtime converters instead 
of member methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Actually, those should be the default conversion classes from 
`java.time`. I changed that. It is not used, at least for now. We might need 
that later e.g. in schema-registry where we will need to convert schema 
retrieved from schema-registry to a SQL schema. 
   
   It was added as a  complementary method to 
`org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to