[ 
https://issues.apache.org/jira/browse/FLINK-35911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35911:
------------------------------
    Description: 
问题描述:业务反馈数据丢失,经排查发现是上游Kafka-Source部分分区数据没有被消费,其次业务也多次从状态重启,但这个问题仍然存在。

解决方式:后续为了排查定位问题配置了参数:pipeline.operator-chaining = 
'false',然后有状态重启任务,很诡异的发现任务竟然恢复了正常消费。

业务场景:简单数据同步,kafka write hdfs

Source定义如下:
{code:java}
// code placeholder
CREATE TABLE `play_log_source` (
  `appName` VARCHAR,
  `appInfo.channel` VARCHAR,
  `channel_name` AS `appInfo.channel`,
  `appInfo.packageName` VARCHAR,
  `package_name` AS `appInfo.packageName`,
  `deviceInfo.deviceId` VARCHAR,
  `device_id` AS `deviceInfo.deviceId`,
  `deviceInfo.deviceName` VARCHAR
) WITH (
  'nested-json.key.fields.deserialize-min.enabled' = 'true',
  'connector' = 'kafka',
  'format' = 'nested-json'
);{code}
 

Flink版本:Flink-1.18.1

  was:
问题描述:业务反馈数据丢失,经排查发现是上游Kafka-Source部分分区数据没有被消费,其次业务也多次从状态重启,但这个问题仍然存在。

解决方式:后续为了排查定位问题配置了参数:pipeline.operator-chaining = 'false',然后有状态重启任务,发现任务恢复正常。

业务场景:简单数据同步,kafka write hdfs

Source定义如下:
{code:java}
// code placeholder
CREATE TABLE `play_log_source` (
  `appName` VARCHAR,
  `appInfo.channel` VARCHAR,
  `channel_name` AS `appInfo.channel`,
  `appInfo.packageName` VARCHAR,
  `package_name` AS `appInfo.packageName`,
  `deviceInfo.deviceId` VARCHAR,
  `device_id` AS `deviceInfo.deviceId`,
  `deviceInfo.deviceName` VARCHAR
) WITH (
  'nested-json.key.fields.deserialize-min.enabled' = 'true',
  'connector' = 'kafka',
  'format' = 'nested-json'
);{code}
 

Flink版本:Flink-1.18.1


> Flink kafka-source connector
> ----------------------------
>
>                 Key: FLINK-35911
>                 URL: https://issues.apache.org/jira/browse/FLINK-35911
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.0.1
>            Reporter: Gang Yang
>            Priority: Major
>
> 问题描述:业务反馈数据丢失,经排查发现是上游Kafka-Source部分分区数据没有被消费,其次业务也多次从状态重启,但这个问题仍然存在。
> 解决方式:后续为了排查定位问题配置了参数:pipeline.operator-chaining = 
> 'false',然后有状态重启任务,很诡异的发现任务竟然恢复了正常消费。
> 业务场景:简单数据同步,kafka write hdfs
> Source定义如下:
> {code:java}
> // code placeholder
> CREATE TABLE `play_log_source` (
>   `appName` VARCHAR,
>   `appInfo.channel` VARCHAR,
>   `channel_name` AS `appInfo.channel`,
>   `appInfo.packageName` VARCHAR,
>   `package_name` AS `appInfo.packageName`,
>   `deviceInfo.deviceId` VARCHAR,
>   `device_id` AS `deviceInfo.deviceId`,
>   `deviceInfo.deviceName` VARCHAR
> ) WITH (
>   'nested-json.key.fields.deserialize-min.enabled' = 'true',
>   'connector' = 'kafka',
>   'format' = 'nested-json'
> );{code}
>  
> Flink版本:Flink-1.18.1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to