[ https://issues.apache.org/jira/browse/FLINK-35911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gang Yang updated FLINK-35911: ------------------------------ Description: 问题描述:业务反馈数据丢失,经排查发现是上游Kafka-Source部分分区数据没有被消费,其次业务也多次从状态重启,但这个问题仍然存在。 解决方式:后续为了排查定位问题配置了参数:pipeline.operator-chaining = 'false',然后有状态重启任务,很诡异的发现任务竟然恢复了正常消费。 业务场景:简单数据同步,kafka write hdfs Source定义如下: {code:java} // code placeholder CREATE TABLE `play_log_source` ( `appName` VARCHAR, `appInfo.channel` VARCHAR, `channel_name` AS `appInfo.channel`, `appInfo.packageName` VARCHAR, `package_name` AS `appInfo.packageName`, `deviceInfo.deviceId` VARCHAR, `device_id` AS `deviceInfo.deviceId`, `deviceInfo.deviceName` VARCHAR ) WITH ( 'nested-json.key.fields.deserialize-min.enabled' = 'true', 'connector' = 'kafka', 'format' = 'nested-json' );{code} Flink版本:Flink-1.18.1 was: 问题描述:业务反馈数据丢失,经排查发现是上游Kafka-Source部分分区数据没有被消费,其次业务也多次从状态重启,但这个问题仍然存在。 解决方式:后续为了排查定位问题配置了参数:pipeline.operator-chaining = 'false',然后有状态重启任务,发现任务恢复正常。 业务场景:简单数据同步,kafka write hdfs Source定义如下: {code:java} // code placeholder CREATE TABLE `play_log_source` ( `appName` VARCHAR, `appInfo.channel` VARCHAR, `channel_name` AS `appInfo.channel`, `appInfo.packageName` VARCHAR, `package_name` AS `appInfo.packageName`, `deviceInfo.deviceId` VARCHAR, `device_id` AS `deviceInfo.deviceId`, `deviceInfo.deviceName` VARCHAR ) WITH ( 'nested-json.key.fields.deserialize-min.enabled' = 'true', 'connector' = 'kafka', 'format' = 'nested-json' );{code} Flink版本:Flink-1.18.1 > Flink kafka-source connector > ---------------------------- > > Key: FLINK-35911 > URL: https://issues.apache.org/jira/browse/FLINK-35911 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: kafka-3.0.1 > Reporter: Gang Yang > Priority: Major > > 问题描述:业务反馈数据丢失,经排查发现是上游Kafka-Source部分分区数据没有被消费,其次业务也多次从状态重启,但这个问题仍然存在。 > 解决方式:后续为了排查定位问题配置了参数:pipeline.operator-chaining = > 'false',然后有状态重启任务,很诡异的发现任务竟然恢复了正常消费。 > 业务场景:简单数据同步,kafka write hdfs > Source定义如下: > {code:java} > // code placeholder > CREATE TABLE `play_log_source` ( > `appName` VARCHAR, > `appInfo.channel` VARCHAR, > `channel_name` AS `appInfo.channel`, > `appInfo.packageName` VARCHAR, > `package_name` AS `appInfo.packageName`, > `deviceInfo.deviceId` VARCHAR, > `device_id` AS `deviceInfo.deviceId`, > `deviceInfo.deviceName` VARCHAR > ) WITH ( > 'nested-json.key.fields.deserialize-min.enabled' = 'true', > 'connector' = 'kafka', > 'format' = 'nested-json' > );{code} > > Flink版本:Flink-1.18.1 -- This message was sent by Atlassian Jira (v8.20.10#820010)