monologuist opened a new pull request, #4458:
URL: https://github.com/apache/flink-cdc/pull/4458

   ## What is changed
   
   This PR reduces incremental deserialization overhead in the MySQL pipeline 
connector by caching the inferred row `DataType` per table in 
`MySqlEventDeserializer`.
   
   Previously, the deserializer inferred the row `DataType` from the Debezium 
struct for every incoming record. Under hotspot UPDATE workloads, this repeated 
inference becomes a visible CPU hotspot in the incremental synchronization path.
   
   This PR makes the following changes:
   
   - cache inferred row `DataType` by `TableId` in `MySqlEventDeserializer`
   - invalidate the cache when schema change events are observed
   - normalize table identifiers consistently in case-insensitive mode so cache 
writes and invalidation use the same key
   - add a narrow protected helper in `DebeziumEventDeserializationSchema` so 
subclasses can reuse the existing converter logic with a precomputed `DataType`
   
   ## Why
   
   In a MySQL-to-Doris pipeline with large-table hotspot UPDATE traffic, we 
observed low incremental synchronization throughput and lag accumulation 
between upstream and downstream.
   
   Based on flame graph analysis, a noticeable portion of CPU time was spent in 
the MySQL pipeline deserialization path, especially on repeated schema/data 
type inference for the same table.
   
   [FLINK-35715](https://issues.apache.org/jira/browse/FLINK-35715) addressed 
the correctness issue caused by timestamp precision inference and 
`BinaryRecordData` layout mismatch. However, repeated schema inference is still 
present on the current master branch and remains a measurable CPU hotspot in 
MySQL pipeline deserialization.
   
   This PR focuses on that remaining performance overhead.
   
   The optimization relies on a natural assumption: for the same table, the row 
`DataType` remains stable until a schema change event arrives. Based on this 
assumption, the inferred row `DataType` can be safely reused across records and 
invalidated when schema changes are observed.
   
   ## Tests
   
   The following tests were added or updated:
   
   - verify the cache lifecycle across repeated records and schema change 
invalidation
   - verify cache isolation across different tables
   - verify case-insensitive table IDs are normalized consistently for cache 
reuse and invalidation
   
   In our internal verification, this optimization improved throughput by about 
18% in a test environment and about 19% in a production-like workload.
   
   ## Notes
   
   This PR focuses on reducing redundant row type inference in the MySQL 
pipeline deserialization path. It does not change changelog semantics or 
introduce new user-facing configuration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to