Hi, Andrew.
Yeah, currently, the output from Kafka pipeline didn't contain schema info, So 
in Paimon action, it will be considered as a String type.
Your suggestion is very meaningful. I plan to support this feature in the next 
version of FlinkCDC (FlinkCDC 3.3), which may be enabled through a parameter[1].

And Paimon sink is already available in FlinkCDC, so we can directly write data 
from MariaDB to Paimon to reduce the number of components and links that need 
to be maintained, We will also follow up on any issues encountered in Paimon 
Pipeline sink.

[1] https://issues.apache.org/jira/browse/FLINK-36611


> 2024年10月26日 11:32,Andrew Otto <o...@wikimedia.org> 写道:
> 
> Hi!
> 
> I really like Flink CDC's pipeline connectors 
> <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/pipeline-connectors/overview/>!
>   So simple!  
> I also like Paimon's CDC ingestion action CLI 
> <https://paimon.apache.org/docs/master/flink/cdc-ingestion/overview/>. 
> 
> I like these because I don't need to specify the schemas; they are inferred 
> from the source.  I also like the schema evolution support!
> 
> Paimon's recent Iceberg Compatibility 
> <https://paimon.apache.org/docs/master/migration/iceberg-compatibility/> mode 
> looks cool!
> 
> I'd like to accomplish the following:
> MariaDB CDC -> Kafka 
> Kafka -> Paimon
> Query with Spark+Iceberg
> I can do MariaDB CDC -> Kafka with the flink-cdc pipeline connector 
> <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/pipeline-connectors/kafka/>.
> However, I'm stuck on Kafka -> Paimon.
> 
> Paimon's Kafka CDC action docs 
> <https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/> say:
> > Usually, debezium-json contains ‘schema’ field, from which Paimon will 
> > retrieve data types. Make sure your debezium json has this field, or Paimon 
> > will use ‘STRING’ type.
> 
> However, the messages generated by flink-cdc's pipeline connector do not have 
> a schema field.  They look like:
> 
> {
>   "before": null,
>   "after": {
>     "rev_id": 37,
>      ...
>   },
>   "op": "c"
> }
> 
> The only reference I can find to a schema field is in the debezium-json 
> format documentation 
> <https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/formats/debezium/#debezium-json-ddl>.
> > users may setup the Debezium Kafka Connect with the Kafka configuration 
> > 'value.converter.schemas.enable' enabled to include schema in the message
> 
> This leads me to believe that the schema field that Paimon's doc is referring 
> to is added not by debezium or flink-cdc, but by Kafka Connect when it is 
> used with Debezium proper to write CDC messages to Kafka.
> Does this mean that the CDC messages generated by the flink-cdc kafka 
> pipeline connector are not compatible with Paimon's kafka_sync_database 
> action? 
> Or, is there a way to cause flink-cdc pipeline connectors to include schema 
> in the message?
> 
> I could be misunderstanding something. Is Kafka Connect+Debezium used by 
> Flink to support debezium-json formatted messages? I tried passing 
> properties.value.converter.schemas.enable: true to the flink-cdc pipeline 
> kafka sink but that did not work (as expected).
> 
> Thank you!
> 
> -Andrew Otto
>  Wikimedia Foundation
> 
> P.S. Context for what we are trying to do is here: T373144 [SPIKE] Learn and 
> document how to use Flink-CDC from MediaWiki MariaDB locally 
> <https://phabricator.wikimedia.org/T373144>
> 
> 
> 

Reply via email to