codope commented on code in PR #10500:
URL: https://github.com/apache/hudi/pull/10500#discussion_r1463439259


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -199,6 +199,31 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     }
   }
 
+  def getJsonRdd(df: DataFrame): RDD[String] = {
+    df.toJSON.rdd

Review Comment:
   Should we handle empty df?



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -202,6 +202,21 @@ public static byte[] avroToJson(GenericRecord record, 
boolean pretty) throws IOE
     return out.toByteArray();
   }
 
+  /**
+   * Convert a given avro record to json and return the string
+   *
+   * @param record The GenericRecord to convert
+   * @param pretty Whether to pretty-print the json output
+   */
+  public static String avroToJsonString(GenericRecord record, boolean pretty) 
throws IOException {

Review Comment:
   Can we reuse existing `public static byte[] avroToJson(GenericRecord record, 
boolean pretty)`?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java:
##########
@@ -116,16 +128,32 @@ static Option<JavaRDD<HoodieRecord>> 
createHoodieRecords(HoodieStreamer.Config c
 
           return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), 
rec -> {
             InternalRow row = (InternalRow) 
deserializer.deserialize(rec).get();
-            String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
-            String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
-            return new HoodieSparkRecord(new HoodieKey(recordKey, 
partitionPath),
-                
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
+            try {
+              String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
+              String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
+              return Either.left(new HoodieSparkRecord(new 
HoodieKey(recordKey, partitionPath),
+                  
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false));
+            } catch (Exception e) {
+              if (!shouldErrorTable) {
+                throw e;
+              }
+              try {
+                return Either.right(HoodieAvroUtils.avroToJsonString(rec, 
false));
+              } catch (IOException ex) {
+                throw new RuntimeException(ex);

Review Comment:
   better to throw HoodieIOException?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java:
##########
@@ -116,16 +128,32 @@ static Option<JavaRDD<HoodieRecord>> 
createHoodieRecords(HoodieStreamer.Config c
 
           return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), 
rec -> {
             InternalRow row = (InternalRow) 
deserializer.deserialize(rec).get();
-            String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
-            String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
-            return new HoodieSparkRecord(new HoodieKey(recordKey, 
partitionPath),
-                
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
+            try {
+              String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
+              String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
+              return Either.left(new HoodieSparkRecord(new 
HoodieKey(recordKey, partitionPath),
+                  
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false));
+            } catch (Exception e) {
+              if (!shouldErrorTable) {
+                throw e;
+              }
+              try {
+                return Either.right(HoodieAvroUtils.avroToJsonString(rec, 
false));
+              } catch (IOException ex) {
+                throw new RuntimeException(ex);
+              }
+            }
           });
+
         });
       } else {
         throw new UnsupportedOperationException(recordType.name());
       }
-      return records;
+      if (shouldErrorTable) {
+        
errorTableWriter.get().addErrorEvents(records.filter(Either::isRight).map(Either::asRight).map(evStr
 -> new ErrorEvent<>(evStr,

Review Comment:
   should we also check if `errorTableWriter` is present or not?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -199,6 +199,31 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     }
   }
 
+  def getJsonRdd(df: DataFrame): RDD[String] = {
+    df.toJSON.rdd
+  }
+
+  /**
+   * Rerwite the record into the target schema.
+   * Return tuple of rewritten records and records that could not be converted
+   */
+  def safeRewriteRDD(df: RDD[GenericRecord], serializedTargetSchema: String): 
Tuple2[RDD[GenericRecord], RDD[String]] = {

Review Comment:
   would be nice to cover these util methods in UT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to