YuriyGavrilov commented on issue #8604:
URL: https://github.com/apache/seatunnel/issues/8604#issuecomment-2638901983

   make some config update to filter rows like this
   
   ```
   env {
     parallelism = 1
     job.mode = "BATCH"
     read_limit.rows_per_second = 120
   }
   
   source {
     LocalFile {
       schema {
         fields {
           no = string
           answer = string
         }
       }
       path = "/config/2data44.csv"
       file_format_type = "text"
       field_delimiter = "\t"
       csv.quote_char = "\""
       skip_header_row_number = 1
       read_columns = ["no", "answer"]
   
       plugin_output = "source_table"
   
       parse_options {
         skip_empty_rows = true
         allow_missing_values = true
         error_handling = "skip_row"
       }
   
     }
   }
   
   transform {
     
     Sql {
       plugin_input = "source_table"
       source_table_name = "source_table"  
       result_table_name = "cleaned_table"
       plugin_output = "cleaned_output"
       query = """
               SELECT
           REGEXP_REPLACE(
             no,
             '[^\\sа-яА-ЯёЁ0-9\\.,!\\?;:\\\"«»\\(\\)\\-]',
             ' '
           ) AS cleaned_no,
           REGEXP_REPLACE(
             answer,
             '[^\\sа-яА-ЯёЁ0-9\\.,!\\?;:\\\"«»\\(\\)\\-]',
             ' '
           ) AS cleaned_answer
         FROM source_table
       """
     }
   
   
     LLM {
       plugin_input = "cleaned_output"
       source_table_name = "cleaned_table"
       result_table_name = "llm_processed_table"
       plugin_output = "llm_output"
   
       model_provider = OPENAI
       inference_columns = ["cleaned_answer"]
       model = gpt-4o-mini
       # error_handling = "SKIP" 
       timeout = 950000 
       retry_times = 3 
   
   
   
       batch_size = 1 
       delay_between_requests = 2000 
       normalize_input = true
       min_message_length = 2
       api_key = "XXXXX" # 
       # output_column = "llm_result" # Добавьте output_column
       api_path  = "https://api.XXXX/openai/v1/chat/completions";
   
       error_handling {
         mode = "SKIP"
         default_value = "999"
       }
   
       request_limits {
         max_requests_per_minute = 100
         max_tokens_per_minute = 200000
       }
   
   
       request_config {
         temperature = 0.0
         max_tokens = 30000
         top_p = 1.0
       }
   
       output_data_type = "STRING"
   
       input_fields = ["cleaned_answer"]
       result_field = "llm_summary"
       append_fields = ["cleaned_no", "cleaned_answer"]
   
   
       prompt = "XXXX ${cleaned_answer}. XXXXX"
   
     }
   
   }
   
   sink {
     LocalFile {
       plugin_input = "llm_output"
       columns = ["cleaned_no", "cleaned_answer", "llm_summary"]
   
       path = "/config/3data_out.csv"
       file_format_type = "text" 
       field_delimiter = "\t"
       encoding = "UTF-8"
       rolling_policy.size = "128MB"
       data_save_mode = "APPEND_DATA"
       enable_checkpoint = true
       tmp_path = "/config/tmp/sea"
   
       write_options {
         max_rows_in_memory = 100000
         batch_size = 500
         retry_times = 3
         retry_interval = 1000
       }
   
     }
   }
   
   ```
   
   
   received error close to the end of main file 
   
   ```
   2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
   
   
===============================================================================
   
   
   2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Fatal Error, 
   
   �2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Please submit bug report in https://github.com/apache/seatunnel/issues
   
   2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Reason:SeaTunnel job executed failed 
   
   �2025-02-05 22:20:55,447 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Exception 
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: 
SeaTunnel job executed failed
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
           at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
           at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException:
 ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this 
file [source_table_file:/config/2data44.csv] failed
   �       at 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:85)
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
    Caused by: java.lang.RuntimeException: Failed to inference model with row 
SeaTunnelRow{tableId=, kind=+I, fields=[2094884, "REMOVED THIS TEXT but it is 
looks like good visually"]}
           at 
org.apache.seatunnel.transform.nlpmodel.llm.LLMTransform.getOutputFieldValue(LLMTransform.java:160)
           at 
org.apache.seatunnel.transform.common.SingleFieldOutputTransform.transformRow(SingleFieldOutputTransform.java:47)
           at 
org.apache.seatunnel.transform.common.SingleFieldOutputTransform.transformRow(SingleFieldOutputTransform.java:35)
           at 
org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80)
           at 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform.map(AbstractCatalogSupportMapTransform.java:42)
           at 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform.map(AbstractCatalogSupportMapTransform.java:27)
           at 
org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform.map(AbstractMultiCatalogMapTransform.java:40)
           at 
org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform.map(AbstractMultiCatalogMapTransform.java:28)
           at 
org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:160)
           at 
org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122)
           at 
org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112)
   �       at 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.lambda$readProcess$0(TextReadStrategy.java:133)
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
           at java.util.stream.SliceOps$1$1.accept(SliceOps.java:204)
           at java.util.Iterator.forEachRemaining(Iterator.java:116)
           at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
           at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
           at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
           at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
           at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
           at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
           at 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.readProcess(TextReadStrategy.java:104)
   �       at 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy.resolveArchiveCompressedInputStream(AbstractReadStrategy.java:268)
           at 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.read(TextReadStrategy.java:71)
   �       at 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:81)
           ... 12 more
   Caused by: java.net.SocketTimeoutException: Read timed out
           at java.net.SocketInputStream.socketRead0(Native Method)
           at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
           at java.net.SocketInputStream.read(SocketInputStream.java:171)
           at java.net.SocketInputStream.read(SocketInputStream.java:141)
           at 
sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:464)
           at 
sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68)
           at 
sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1346)
           at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
           at 
sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:962)
           at 
org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
           at 
org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
           at 
org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
           at 
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
           at 
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
           at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
           at 
org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
           at 
org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
           at 
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
           at 
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
           at 
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
           at 
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
           at 
org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
           at 
org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
           at 
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
           at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
           at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
           at 
org.apache.seatunnel.transform.nlpmodel.llm.remote.kimiai.KimiAIModel.chatWithModel(KimiAIModel.java:75)
           at 
org.apache.seatunnel.transform.nlpmodel.llm.remote.AbstractModel.inference(AbstractModel.java:97)
           at 
org.apache.seatunnel.transform.nlpmodel.llm.LLMTransform.getOutputFieldValue(LLMTransform.java:142)
           ... 39 more
   
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
           ... 2 more
    
   ```
   
   this time it processed  7300 lines, so i will dig into filtering more. but 
interesting that LLM did'n show anything and can't skip errors. 
   
   ```
   2025-02-05 22:20:46,888 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] 
[job-metrics-runner-939621343589040129] - 
   ***********************************************
              Job Progress Information
   ***********************************************
   Job Id                    :  939621343589040129
   Read Count So Far         :                7364
   Write Count So Far        :                7360
   Average Read Count        :                 1/s
   Average Write Count       :                 1/s
   Last Statistic Time       : 2025-02-05 22:19:46
   Current Statistic Time    : 2025-02-05 22:20:46
   ***********************************************
   
   2025-02-05 22:20:52,306 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=939621343589040129, pipelineId=1, 
taskGroupId=50000}] - [localhost]:5801 [seatunnel-1050] [5.1] Exception in 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@60c5a77f
   
�org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException:
 ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this 
file [source_table_file:/config/2data44.csv] failed
   ```
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to