TyrantLucifer opened a new issue, #4587:
URL: https://github.com/apache/seatunnel/issues/4587

   ### Search before asking
   
   - [X] I had searched in the 
[feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   # Design of Dirty Data Collection Functionality for Apache SeaTunnel
   
   ## Introduction
   
   The dirty data collection function can effectively guide users to discover 
data quality problems in a timely manner in the data integration framework, 
ensuring the accuracy and stability of data synchronization.
   
   ## Functional Requirements
   
   The dirty data collection function needs to support the following features:
   
   1. Collect and record all dirty data content and exception information.
   2. Support plug-in extension.
   
   ## Technical Solution
   
   ### Interface Design
   
   Add interface `DirtyRecordCollector`:
   
   ```java
   /** Base interface for dirty records collector */
   public interface DirtyRecordCollector extends Serializable {
   
       /** Collect dirty record with exception and error message */
       void collect(
               final int subTaskIndex,
               final SeaTunnelRow dirtyRecord,
               final Throwable exception,
               final String errorMessage);
   
       /** Collect dirty record with exception */
       default void collect(
               final int subTaskIndex, final SeaTunnelRow dirtyRecord, final 
Throwable exception) {
           collect(subTaskIndex, dirtyRecord, exception, "");
       }
   }
   ```
   
   Add method `getDirtyRecordCollector` to `SinkWriter.Context`:
   
   ```java
   
   interface Context extends Serializable {
   
       /** @return The index of this subtask. */
       int getIndexOfSubtask();
   
       /** @return metricsContext of this reader. */
       MetricsContext getMetricsContext();
   
       /**
        * Get dirty record collector
        */
       DirtyRecordCollector getDirtyRecordCollector();
   }
   
   ```
   
   Add method `getPluginConfig` to `SeaTunnelPluginLifeCycle`:
   
   ```java
   
   /**
    * Get the config of plugin
    */
   Config getPluginConfig();
   
   ```
   
   ### Configuration File Example
   
   ```hocon
   env {
     job.mode = BATCH
     dirty.collector = {
       type = log
     }
   }
   
   source {
     FakeSource {
       row.num = 100
       schema {
         fields {
           name = string
           age = int
         }
       }
     }
   }
   
   sink {
     Console {}
   }
   
   ```
   
   ### Technical Implementation
   
   1. Modify `seatunnel-config-shade`. During the transmission process, 
`SeaTunnelSink` will undergo serialization operations, but the underlying 
interface of `typesafe-config` does not support serialization, so it is 
necessary to make `Config` support serialization to ensure that it is not lost 
during transmission. See #4586 for specific implementation details.
   2. Modify all `SeaTunnelSink` to implement the `getPluginConfig` interface 
and inject the pluginConfig during the prepare phase.
   Before instantiating the Sink, merge the plugin configuration for all Sinks, 
and merge the dirty data collector configuration information from the env 
section into the plugin configuration.
   3. Modify `DefaultSinkWriterContext` and `SinkWriterContext` to instantiate 
the `DirtyRecordCollector` using the configuration and implement the 
`getDirtyRecordCollector` method.
   4. Modify all `SeaTunnelSink` to report and update metrics using the dirty 
data collector when data writing or conversion fails.
   
   ### Process Design
   
   
![drawio](https://user-images.githubusercontent.com/51053924/232326690-42a0680a-e002-4c0b-8dd0-6e056efe8484.png)
   
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to