hi 既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说
> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1) 建议看看是不是这个转发有问题,只转发了一个节点 Best zhisheng Lynn Chen <[email protected]> 于2020年10月23日周五 上午11:01写道: > > > > hi, zhisheng: > > > 我解析 json 后: > (xxx, xxx, xxx, topic, partition, offset) > => > > > (false,1603420582310,"INSERT","test3.order",2,75) > (false,1603421312803,"INSERT","test3.order",2,76) > (false,1603421344819,"INSERT","test3.order",2,77) > (false,1603421344819,"INSERT","test3.order",2,78) > > > 我增加十几条数据, 拿到的都是 partition 2 的数据(4 条), 1跟 3 的没有拿到 > > > 我的猜想: > > > 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1) > > > broker1 配置: > > > listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 > advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT > security.inter.broker.protocol=PLAINTEXT > > > broker2 配置: > > > listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797 > advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT > security.inter.broker.protocol=PLAINTEXT > > > > > > > > broker3 配置: > > > listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797 > advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT > security.inter.broker.protocol=PLAINTEXT > > > 本机连接kafka: > properties.setProperty("bootstrap.servers", "xxx-b-1:9797") > > > 是跟这个配置有关吗? > > > > > > > > > > > 在 2020-10-23 08:37:14,"zhisheng" <[email protected]> 写道: > >hi > > > >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema > >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。 > > > >eg: > > > > env.addSource(new FlinkKafkaConsumer011<>( > >parameters.get("topic"), new > >JSONKeyValueDeserializationSchema(true), > >buildKafkaProps(parameters))) .flatMap(new > >FlatMapFunction<ObjectNode, ObjectNode>() { > >@Override public void flatMap(ObjectNode jsonNodes, > >Collector<ObjectNode> collector) throws Exception { > > System.out.println(jsonNodes.get("value")); > >System.out.println(jsonNodes.get("metadata").get("topic").asText()); > > > >System.out.println(jsonNodes.get("metadata").get("offset").asText()); > > > >System.out.println(jsonNodes.get("metadata").get("partition").asText()); > > collector.collect(jsonNodes); > > } }) .print(); > > > >Best > > > >zhisheng > > > > > >Lynn Chen <[email protected]> 于2020年10月23日周五 上午12:13写道: > > > >> > >> > >> > >> > >> > >> > >> hi, Qijun Feng: > >> > >> > >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈? > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-04-03 09:27:52,"LakeShen" <[email protected]> 写道: > >> >Hi Qijun, > >> > > >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。 > >> > > >> >Best, > >> >LakeShen > >> > > >> >Qijun Feng <[email protected]> 于2020年4月2日周四 下午5:44写道: > >> > > >> >> Dear All, > >> >> > >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka > >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址, > >> >> 现在改成了所有地址,也换了 group.id > >> >> > >> >> > >> >> Properties properties = new Properties(); > >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092, > >> >> 10.216.77.170:9092,10.216.77.188:9092"); > >> >> properties.setProperty("group.id", "behavior-logs-aggregator"); > >> >> > >> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 = > >> >> new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", > new > >> >> BehaviorLogDeserializationSchema(), properties); > >> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01 > >> >> > >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 > partiton=1,或者 > >> 2 > >> >> 的, > >> >> > >> >> 2020-04-02 14:54:58,532 INFO > >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > >> >> Consumer subtask 0 creating fetcher with offsets > >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}. > >> >> > >> >> > >> >> 是哪里有问题吗? > >> >> > >> >> > >> >
