[ https://issues.apache.org/jira/browse/FLINK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395843#comment-17395843 ]
Arvid Heise commented on FLINK-23674: ------------------------------------- Can you please provide the full log? > flink restart with checkpoint ,kafka producer throw exception > -------------------------------------------------------------- > > Key: FLINK-23674 > URL: https://issues.apache.org/jira/browse/FLINK-23674 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.1 > Environment: flink:flink-1.13.1 > kafka: _2.12-2.5.0 > java: 1.8.0_161 > Reporter: meetsong > Priority: Major > > > when I test flink eos, and sink is kafka. first I click the button of > cancel on flink web ui , then I input following code on console > {code:java} > bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s > file:/soft/opt/checkpoint/072c0a72343c6e1f06b9bd37c5147cc0/chk-1/_metadata > ./ad-live-process-0.11-jar-with-dependencies.jar > {code} > , after 10 second throw a exception > {code:java} > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; Producer attempted an operation with an old epoch. > Either there is a newer producer with the same transactionalId, or the > producer's transaction has been expired by the broker. > {code} > and my code is : > {code:java} > package com.shanjiancaofu.live.job; > import com.alibaba.fastjson.JSON; > import lombok.AllArgsConstructor; > import lombok.Data; > import lombok.NoArgsConstructor; > import lombok.extern.slf4j.Slf4j; > import org.apache.commons.lang.SystemUtils; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.environment.CheckpointConfig; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; > import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; > import > org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; > import org.apache.flink.util.Collector; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.IsolationLevel; > import java.util.*; > @Slf4j > public class ChargeJob1 { > static class RecordScheme implements > KafkaDeserializationSchema<ConsumerRecord<String, UserEvent>> { > @Override > public boolean isEndOfStream(ConsumerRecord<String, UserEvent> > stringUserEventConsumerRecord) { > return false; > } > @Override > public ConsumerRecord<String, UserEvent> > deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { > String key = null; > UserEvent UserEvent = null; > if (consumerRecord.key() != null) { > key = new String(consumerRecord.key()); > } > if (consumerRecord.value() != null) { > UserEvent = JSON.parseObject(new String(consumerRecord.value()), > UserEvent.class); > } > return new ConsumerRecord<>( > consumerRecord.topic(), > consumerRecord.partition(), > consumerRecord.offset(), > consumerRecord.timestamp(), > consumerRecord.timestampType(), > consumerRecord.checksum(), > consumerRecord.serializedKeySize(), > consumerRecord.serializedValueSize(), > key, UserEvent); > } > @Override > public TypeInformation<ConsumerRecord<String, UserEvent>> > getProducedType() { > return TypeInformation.of(new TypeHint<ConsumerRecord<String, > UserEvent>>() { > }); > } > } > public static void main(String[] args) throws Exception { > Configuration configuration = new Configuration(); > if (args != null) { > // 传递全局参数 > configuration.setString("args", String.join(" ", args)); > } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(3); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.minutes(5), > Time.seconds(10))); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > //本地checkpoint配置 > env.enableCheckpointing(1000 * 60L); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > // 每个检查点的间隔 > checkpointConfig.setMinPauseBetweenCheckpoints(1000 * 5L); > checkpointConfig.setCheckpointTimeout(1000 * 60L); > // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > if (SystemUtils.IS_OS_WINDOWS) { > env.setStateBackend(new > FsStateBackend("file:///soft/opt/checkpoint")); > } else { > env.setStateBackend(new > FsStateBackend("file:///soft/opt/checkpoint")); > } > // 2. 读取数据 > //kafka sink配置//kafka sink配置 > Properties sinkProperties = new Properties(); > sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092"); > sinkProperties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, > 1000 * 60 + ""); > sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); > sinkProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, > "true"); > > sinkProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > "1"); > sinkProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, > "user-event-processd-tr"); > sinkProperties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, > "org.apache.kafka.clients.producer.RoundRobinPartitioner"); > FlinkKafkaProducer<String> stringFlinkKafkaProducer = new > FlinkKafkaProducer<String>("dsp-user-event-processd", > new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), > sinkProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > Properties consumerProp = new Properties(); > consumerProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092"); > consumerProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > "dsp-user-event-group"); > consumerProp.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > > consumerProp.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > consumerProp.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "latest"); > consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, > IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); > > consumerProp.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > "org.apache.kafka.clients.consumer.RoundRobinAssignor"); > env.addSource(new FlinkKafkaConsumer<>("dsp-user-event", new > RecordScheme(), consumerProp)) > .name("dsp-user-event-source") > .keyBy(new KeySelector<ConsumerRecord<String, UserEvent>, Long>() { > @Override > public Long getKey(ConsumerRecord<String, UserEvent> value) > throws Exception { > return value.value().getUserId(); > } > }) > .process(new ChargeProcess()).setParallelism(3) > .map(obj -> obj) > .addSink(stringFlinkKafkaProducer) > .name("dsp-user-event-sink").uid("dsp-user-event-sink-uid"); > env.execute("chargeJob"); > } > public static class ChargeProcess extends KeyedProcessFunction<Long, > ConsumerRecord<String, UserEvent>, String> { > ListState<String> listState = null; > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > listState = getRuntimeContext().getListState(new > ListStateDescriptor<String>("ad-ip", String.class)); > } > @Override > public void close() throws Exception { > super.close(); > } > @Override > public void processElement(ConsumerRecord<String, UserEvent> > stringUserEventConsumerRecord, > Context context, > Collector<String> collector) throws Exception { > UserEvent value = stringUserEventConsumerRecord.value(); > Iterable<String> strings = listState.get(); > Iterator<String> iterator = strings.iterator(); > List<String> objects = new ArrayList<>(); > iterator.forEachRemaining(objects::add); > objects.add(value.getUserId() + ""); > try { > // 幂等操作即可, self process > boolean result = true; > if (result) { > listState.update(objects); > collector.collect(JSON.toJSONString(value)); > } > log.info(Thread.currentThread().getId() + ": 处理:" + > value.getEventMd5() + " " + listState.get().toString()); > System.out.println(Thread.currentThread().getId() + ": 处理:" + > value.getEventMd5() + " " + listState.get().toString()); > } catch (Exception e) { > System.out.println(e); > } > } > } > @Data > @AllArgsConstructor > @NoArgsConstructor > public static class UserEvent { > private Long userId; > private String eventMd5; > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)