[ https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Leonard Xu resolved FLINK-35237. -------------------------------- Resolution: Implemented via master(3.2-SNAPSHOT): 26ff6d2a081181f3df7aa49d65d804c57c634122 > Allow Sink to Choose HashFunction in PrePartitionOperator > --------------------------------------------------------- > > Key: FLINK-35237 > URL: https://issues.apache.org/jira/browse/FLINK-35237 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Affects Versions: cdc-3.1.1 > Reporter: zhangdingxin > Assignee: zhangdingxin > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > The {{PrePartitionOperator}} in its current implementation only supports a > fixed {{HashFunction}} > ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). > This limits the ability of Sink implementations to customize the > partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of > partitioned tables, it would be advantageous to allow hashing based on > partition keys, hashing according to table names, or using the database > engine's internal primary key hash functions (such as with MaxCompute > DataSink). > When users require such custom partitioning logic, they are compelled to > implement their PartitionOperator, which undermines the utility of > {{{}PrePartitionOperator{}}}. > To address this limitation, it would be highly desirable to enable the > {{PrePartitionOperator}} to support user-specified custom > {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible > solution could involve a mechanism analogous to the {{DataSink}} interface, > allowing the specification of a {{HashFunctionProvider}} class path in the > configuration file. This enhancement would greatly facilitate users in > tailoring partition strategies to meet their specific application needs. > In this case, I want to create new class {{HashFunctionProvider}} and > {{{}HashFunction{}}}: > {code:java} > public interface HashFunctionProvider { > HashFunction getHashFunction(Schema schema); > } > public interface HashFunction extends Function<DataChangeEvent, Integer> { > Integer apply(DataChangeEvent event); > } {code} > add {{getHashFunctionProvider}} method to {{DataSink}} > > {code:java} > public interface DataSink { > /** Get the {@link EventSinkProvider} for writing changed data to > external systems. */ > EventSinkProvider getEventSinkProvider(); > /** Get the {@link MetadataApplier} for applying metadata changes to > external systems. */ > MetadataApplier getMetadataApplier(); > default HashFunctionProvider getHashFunctionProvider() { > return new DefaultHashFunctionProvider(); > } > } {code} > and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. > {code:java} > private HashFunction recreateHashFunction(TableId tableId) { > return > hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)