YuriyGavrilov commented on issue #8604: URL: https://github.com/apache/seatunnel/issues/8604#issuecomment-2638901983
make some config update to filter rows like this ``` env { parallelism = 1 job.mode = "BATCH" read_limit.rows_per_second = 120 } source { LocalFile { schema { fields { no = string answer = string } } path = "/config/2data44.csv" file_format_type = "text" field_delimiter = "\t" csv.quote_char = "\"" skip_header_row_number = 1 read_columns = ["no", "answer"] plugin_output = "source_table" parse_options { skip_empty_rows = true allow_missing_values = true error_handling = "skip_row" } } } transform { Sql { plugin_input = "source_table" source_table_name = "source_table" result_table_name = "cleaned_table" plugin_output = "cleaned_output" query = """ SELECT REGEXP_REPLACE( no, '[^\\sа-яА-ЯёЁ0-9\\.,!\\?;:\\\"«»\\(\\)\\-]', ' ' ) AS cleaned_no, REGEXP_REPLACE( answer, '[^\\sа-яА-ЯёЁ0-9\\.,!\\?;:\\\"«»\\(\\)\\-]', ' ' ) AS cleaned_answer FROM source_table """ } LLM { plugin_input = "cleaned_output" source_table_name = "cleaned_table" result_table_name = "llm_processed_table" plugin_output = "llm_output" model_provider = OPENAI inference_columns = ["cleaned_answer"] model = gpt-4o-mini # error_handling = "SKIP" timeout = 950000 retry_times = 3 batch_size = 1 delay_between_requests = 2000 normalize_input = true min_message_length = 2 api_key = "XXXXX" # # output_column = "llm_result" # Добавьте output_column api_path = "https://api.XXXX/openai/v1/chat/completions" error_handling { mode = "SKIP" default_value = "999" } request_limits { max_requests_per_minute = 100 max_tokens_per_minute = 200000 } request_config { temperature = 0.0 max_tokens = 30000 top_p = 1.0 } output_data_type = "STRING" input_fields = ["cleaned_answer"] result_field = "llm_summary" append_fields = ["cleaned_no", "cleaned_answer"] prompt = "XXXX ${cleaned_answer}. XXXXX" } } sink { LocalFile { plugin_input = "llm_output" columns = ["cleaned_no", "cleaned_answer", "llm_summary"] path = "/config/3data_out.csv" file_format_type = "text" field_delimiter = "\t" encoding = "UTF-8" rolling_policy.size = "128MB" data_save_mode = "APPEND_DATA" enable_checkpoint = true tmp_path = "/config/tmp/sea" write_options { max_rows_in_memory = 100000 batch_size = 500 retry_times = 3 retry_interval = 1000 } } } ``` received error close to the end of main file ``` 2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] - =============================================================================== 2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error, �2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues 2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed �2025-02-05 22:20:55,447 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this file [source_table_file:/config/2data44.csv] failed � at org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:85) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: Failed to inference model with row SeaTunnelRow{tableId=, kind=+I, fields=[2094884, "REMOVED THIS TEXT but it is looks like good visually"]} at org.apache.seatunnel.transform.nlpmodel.llm.LLMTransform.getOutputFieldValue(LLMTransform.java:160) at org.apache.seatunnel.transform.common.SingleFieldOutputTransform.transformRow(SingleFieldOutputTransform.java:47) at org.apache.seatunnel.transform.common.SingleFieldOutputTransform.transformRow(SingleFieldOutputTransform.java:35) at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80) at org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform.map(AbstractCatalogSupportMapTransform.java:42) at org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform.map(AbstractCatalogSupportMapTransform.java:27) at org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform.map(AbstractMultiCatalogMapTransform.java:40) at org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform.map(AbstractMultiCatalogMapTransform.java:28) at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:160) at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122) at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43) at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195) at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112) � at org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.lambda$readProcess$0(TextReadStrategy.java:133) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.SliceOps$1$1.accept(SliceOps.java:204) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.readProcess(TextReadStrategy.java:104) � at org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy.resolveArchiveCompressedInputStream(AbstractReadStrategy.java:268) at org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.read(TextReadStrategy.java:71) � at org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:81) ... 12 more Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:464) at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68) at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1346) at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73) at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:962) at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139) at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155) at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284) at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261) at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165) at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) at org.apache.seatunnel.transform.nlpmodel.llm.remote.kimiai.KimiAIModel.chatWithModel(KimiAIModel.java:75) at org.apache.seatunnel.transform.nlpmodel.llm.remote.AbstractModel.inference(AbstractModel.java:97) at org.apache.seatunnel.transform.nlpmodel.llm.LLMTransform.getOutputFieldValue(LLMTransform.java:142) ... 39 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220) ... 2 more ``` this time it processed 7300 lines, so i will dig into filtering more. but interesting that LLM did'n show anything and can't skip errors. ``` 2025-02-05 22:20:46,888 INFO [o.a.s.e.c.j.JobMetricsRunner ] [job-metrics-runner-939621343589040129] - *********************************************** Job Progress Information *********************************************** Job Id : 939621343589040129 Read Count So Far : 7364 Write Count So Far : 7360 Average Read Count : 1/s Average Write Count : 1/s Last Statistic Time : 2025-02-05 22:19:46 Current Statistic Time : 2025-02-05 22:20:46 *********************************************** 2025-02-05 22:20:52,306 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=939621343589040129, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-1050] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@60c5a77f �org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this file [source_table_file:/config/2data44.csv] failed ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org