FrommyMind commented on issue #8909: URL: https://github.com/apache/seatunnel/issues/8909#issuecomment-2739367339
I tested your configuration and example data with SeaTunnel 2.3.9, I encountered the same error. However, with the configuration below, I encountered another exception. ``` env { execution.parallelism = 1, job.mode = BATCH, job.name = test, checkpoint.interval = 10000 } source { Kafka { bootstrap.servers = "127.0.0.1:9092", topic = "seatunnel_test", format = "json", consumer.group = "seatunnel_group" // auto.offset.reset = "earliest", schema = { columns = [ { name = "os", type = "string" }, { name = "spm", type = "string" }, { name = "ip", type = "string" }, { name = "_lang", type = "string" }, { name = "_version", type = "string" }, { name = "app_id", type = "string" }, { name = "channel_id", type = "string" }, { name = "user_id", type = "string" }, { name = "device_id", type = "string" }, { name = "request_id", type = "string" }, { name = "page_url", type = "string" }, { name = "user_agent", type = "string" }, { name = "screen_height", type = "string" }, { name = "screen_width", type = "string" }, { name = "lib_type", type = "string" }, { name = "lib_version", type = "string" }, { name = "timezone_offset", type = "string" }, { name = "event_time", type = "string" }, { name = "event_type", type = "string" }, { name = "event_detail", type = "string" }, { name = "collect_time", type = "string" }, { name = "data_type", type = "string" }, { name = "site_id", type = long }, { name = "year", type = "string" }, { name = "month", type = "string" }, { name = "day", type = "string" }] } } } sink { S3File { "bucket" = "s3a://xxxxxxxx", "path" = "/seatunel-test", "tmp_path" = "/seatunnel_test", "fs.s3a.endpoint" = "s3.xxxxxxxx.amazonaws.com/", "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", "access_key" = "xxx", "secret_key" = "xxxx", "have_partition" = true, "partition_by" = ["site_id", "year", "month", "day"], "sink_columns" = ["site_id", "year", "month", "day", "app_id", "channel_id", "user_id", "device_id", "request_id", "page_url", "os", "user_agent", "screen_height", "screen_width", "lib_type", "lib_version", "timezone_offset", "spm", "event_time", "event_type", "event_detail", "ip", "collect_time", "data_type", "_lang", "_version"], "partition_dir_expression" = "${k0}=${v0}/${k1}=${v1}/${k2}=${v2}/${k3}=${v3}/", "is_partition_field_write_in_file" = false, "custom_filename" = true, "file_name_expression" = "${transactionId}_${now}", "filename_time_format" = "yyyy.MM.dd", "is_enable_transaction" = true, "file_format_type" = "parquet", "data_save_mode" = "APPEND_DATA" } } ``` ```java Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: table seatunnel_test sink throw error at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.RuntimeException: table seatunnel_test sink throw error at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140) at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:184) at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:47) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:268) ... 17 more Caused by: java.lang.NoSuchFieldError: NULL_VALUE at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:295) at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:279) at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.buildAvroSchemaWithRowType(ParquetWriteStrategy.java:426) at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.getOrCreateOutputStream(ParquetWriteStrategy.java:161) at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.write(ParquetWriteStrategy.java:122) at org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:157) at org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:50) at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67) ... 6 more ``` At the source code: `org/apache/parquet/parquet-avro/1.12.3/parquet-avro-1.12.3-sources.jar!/org/apache/parquet/avro/AvroSchemaConverter.java:295` ```java fields.add(new Schema.Field( parquetType.getName(), optional(fieldSchema), null, NULL_VALUE)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org