FrommyMind commented on issue #8909:
URL: https://github.com/apache/seatunnel/issues/8909#issuecomment-2739367339

   I tested your configuration and example data with SeaTunnel 2.3.9, I 
encountered the same error.
   
   
   However, with the configuration below, I encountered another exception.
   
   
   ```
   env {
     execution.parallelism = 1,
     job.mode = BATCH,
     job.name = test,
     checkpoint.interval = 10000
   }
   source {
     Kafka {
   
       bootstrap.servers = "127.0.0.1:9092",
       topic = "seatunnel_test",
       format = "json",
       consumer.group = "seatunnel_group"
   //    auto.offset.reset = "earliest",
       schema = {
         columns = [
           {
             name = "os",
             type = "string"
           },
           {
             name = "spm",
             type = "string"
           },
           {
             name = "ip",
             type = "string"
           },
           {
             name = "_lang",
             type = "string"
           },
           {
             name = "_version",
             type = "string"
           },
           {
             name = "app_id",
             type = "string"
           },
           {
             name = "channel_id",
             type = "string"
           },
           {
             name = "user_id",
             type = "string"
           },
           {
             name = "device_id",
             type = "string"
           },
           {
             name = "request_id",
             type = "string"
           },
           {
             name = "page_url",
             type = "string"
           },
           {
             name = "user_agent",
             type = "string"
           },
           {
             name = "screen_height",
             type = "string"
           },
           {
             name = "screen_width",
             type = "string"
           },
           {
             name = "lib_type",
             type = "string"
           },
           {
             name = "lib_version",
             type = "string"
           },
           {
             name = "timezone_offset",
             type = "string"
           },
           {
             name = "event_time",
             type = "string"
           },
   
           {
             name = "event_type",
             type = "string"
           },
           {
             name = "event_detail",
             type = "string"
           },
           {
             name = "collect_time",
             type = "string"
           },
           {
             name = "data_type",
             type = "string"
           },
           {
             name = "site_id",
             type = long
           },
           {
             name = "year",
             type = "string"
           },
           {
             name = "month",
             type = "string"
           },
           {
             name = "day",
             type = "string"
           }]
       }
     }
   }
   
   
   sink {
     S3File {
   
       "bucket" = "s3a://xxxxxxxx",
       "path" = "/seatunel-test",
       "tmp_path" = "/seatunnel_test",
       "fs.s3a.endpoint" = "s3.xxxxxxxx.amazonaws.com/",
       "fs.s3a.aws.credentials.provider" = 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
       "access_key" = "xxx",
       "secret_key" = "xxxx",
       "have_partition" = true,
       "partition_by" = ["site_id", "year", "month", "day"],
       "sink_columns" = ["site_id", "year", "month", "day", "app_id", 
"channel_id", "user_id", "device_id", "request_id", "page_url", "os", 
"user_agent", "screen_height", "screen_width", "lib_type", "lib_version", 
"timezone_offset", "spm", "event_time", "event_type", "event_detail", "ip", 
"collect_time", "data_type", "_lang", "_version"],
       "partition_dir_expression" = 
"${k0}=${v0}/${k1}=${v1}/${k2}=${v2}/${k3}=${v3}/",
       "is_partition_field_write_in_file" = false,
       "custom_filename" = true,
       "file_name_expression" = "${transactionId}_${now}",
       "filename_time_format" = "yyyy.MM.dd",
       "is_enable_transaction" = true,
       "file_format_type" = "parquet",
       "data_save_mode" = "APPEND_DATA"
     }
   }
   ```
   
   ```java
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
java.lang.RuntimeException: java.lang.RuntimeException: table seatunnel_test 
sink throw error
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.RuntimeException: table seatunnel_test sink throw error
        at 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
        at 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:184)
        at 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:47)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:268)
        ... 17 more
   Caused by: java.lang.NoSuchFieldError: NULL_VALUE
        at 
org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:295)
        at 
org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:279)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.buildAvroSchemaWithRowType(ParquetWriteStrategy.java:426)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.getOrCreateOutputStream(ParquetWriteStrategy.java:161)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.write(ParquetWriteStrategy.java:122)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:157)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:50)
        at 
org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
        ... 6 more
   ```
   
   At the source code:
   
`org/apache/parquet/parquet-avro/1.12.3/parquet-avro-1.12.3-sources.jar!/org/apache/parquet/avro/AvroSchemaConverter.java:295`
   ```java
   fields.add(new Schema.Field(
               parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to