[ https://issues.apache.org/jira/browse/KAFKA-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16038779#comment-16038779 ]
Ismael Juma commented on KAFKA-5390: ------------------------------------ Thanks for the report. Pasting the code so that it's easier to reference. There's an issue in the code below. You are not passing a callback to `send` or calling `get` on the Future. This means that any errors during `send` are not captured. Passing the callback seems to be the simplest option in this case and that will tell us if there are errors during `send`. {code} package com.reftel.magnus.kafkasequence; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; public class SequenceTest { @Rule public final TemporaryFolder temp = new TemporaryFolder(); private ServerCnxnFactory factory; private KafkaServerStartable broker; private int kafkaPort; @Before public void before() throws Throwable { factory = NIOServerCnxnFactory.createFactory(null, 10); factory.startup(new ZooKeeperServer( temp.newFolder("zk-snapshot"), temp.newFolder("zk-log"), ZooKeeperServer.DEFAULT_TICK_TIME )); try (ServerSocket socket = new ServerSocket(0)) { kafkaPort = socket.getLocalPort(); } Properties props = new Properties(); props.put("listeners", String.format("PLAINTEXT://%s:%d", "localhost", kafkaPort)); props.put("log.dir", temp.newFolder("kafka").toString()); props.put("log.segment.bytes", "4000"); props.put("num.partitions", "1"); props.put("zookeeper.connect", String.format("localhost:%d", factory.getLocalPort())); broker = new KafkaServerStartable(new KafkaConfig(props)); broker.startup(); } @After public void after() { broker.shutdown(); factory.shutdown(); } private KafkaProducer<String, String> buildProducer() throws IOException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("localhost:%d", kafkaPort)); props.put(ProducerConfig.LINGER_MS_CONFIG, Integer.toString(60000)); return new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); } private KafkaConsumer<String, String> buildConsumer() throws IOException { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("localhost:%d", kafkaPort)); return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); } @Test public void testSequence() throws IOException { List<ProducerRecord<String, String>> records = new ArrayList<>(9); for (int i = 0; i < 9; i++) { int dataLength = 2000; StringBuilder sb = new StringBuilder(dataLength); for (int n = 0; n < dataLength; n++) { sb.append('x'); } records.add(new ProducerRecord<>("t", Integer.toString(i), sb.toString())); } final KafkaProducer<String, String> producer = buildProducer(); records.forEach(producer::send); producer.flush(); producer.close(); final KafkaConsumer<String, String> consumer = buildConsumer(); consumer.assign(Collections.singleton(new TopicPartition("t", 0))); consumer.seekToBeginning(consumer.assignment()); final ConsumerRecords<String, String> read = consumer.poll(1000); if (!read.isEmpty()) { final ConsumerRecord<String, String> first = read.iterator().next(); Assert.assertEquals(first.key(), "0"); } } } {code} > First records in batch rejected but others accepted when rolling log > -------------------------------------------------------------------- > > Key: KAFKA-5390 > URL: https://issues.apache.org/jira/browse/KAFKA-5390 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.2.1 > Reporter: Magnus Reftel > Attachments: kafka-sequence.tar.gz > > > When sending a sequence of records in a batch right when the broker needs to > roll a new segment, it's possible for the first few records to fail, while > other records in the batch are accepted. If records have dependencies on > earlier records, e.g. in the case of a sequence of events in an event-sourced > system, then a producer cannot use the batching functionality, since it then > risks consumers receiving a record without first receiving the records it > depends on. > See attached testcase (kafka-sequence.tar.gz). -- This message was sent by Atlassian JIRA (v6.3.15#6346)