[ 
https://issues.apache.org/jira/browse/KAFKA-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16038779#comment-16038779
 ] 

Ismael Juma commented on KAFKA-5390:
------------------------------------

Thanks for the report. Pasting the code so that it's easier to reference. 
There's an issue in the code below. You are not passing a callback to `send` or 
calling `get` on the Future. This means that any errors during `send` are not 
captured. Passing the callback seems to be the simplest option in this case and 
that will tell us if there are errors during `send`.

{code}
package com.reftel.magnus.kafkasequence;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;

public class SequenceTest {
    @Rule
    public final TemporaryFolder temp = new TemporaryFolder();

    private ServerCnxnFactory factory;
    private KafkaServerStartable broker;
    private int kafkaPort;

    @Before
    public void before() throws Throwable {
        factory = NIOServerCnxnFactory.createFactory(null, 10);

        factory.startup(new ZooKeeperServer(
            temp.newFolder("zk-snapshot"), temp.newFolder("zk-log"), 
ZooKeeperServer.DEFAULT_TICK_TIME
        ));

        try (ServerSocket socket = new ServerSocket(0)) {
            kafkaPort = socket.getLocalPort();
        }

        Properties props = new Properties();
        props.put("listeners", String.format("PLAINTEXT://%s:%d", "localhost", 
kafkaPort));
        props.put("log.dir", temp.newFolder("kafka").toString());
        props.put("log.segment.bytes", "4000");
        props.put("num.partitions", "1");
        props.put("zookeeper.connect", String.format("localhost:%d", 
factory.getLocalPort()));

        broker = new KafkaServerStartable(new KafkaConfig(props));
        broker.startup();
    }

    @After
    public void after() {
        broker.shutdown();
        factory.shutdown();
    }

    private KafkaProducer<String, String> buildProducer() throws IOException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
String.format("localhost:%d", kafkaPort));
        props.put(ProducerConfig.LINGER_MS_CONFIG, Integer.toString(60000));
        return new KafkaProducer<>(props, new StringSerializer(), new 
StringSerializer());
    }

    private KafkaConsumer<String, String> buildConsumer() throws IOException {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
String.format("localhost:%d", kafkaPort));
        return new KafkaConsumer<>(props, new StringDeserializer(), new 
StringDeserializer());
    }

    @Test
    public void testSequence() throws IOException {
        List<ProducerRecord<String, String>> records = new ArrayList<>(9);
        for (int i = 0; i < 9; i++) {
            int dataLength = 2000;
            StringBuilder sb = new StringBuilder(dataLength);
            for (int n = 0; n < dataLength; n++) {
                sb.append('x');
            }
            records.add(new ProducerRecord<>("t", Integer.toString(i), 
sb.toString()));
        }

        final KafkaProducer<String, String> producer = buildProducer();
        records.forEach(producer::send);
        producer.flush();
        producer.close();

        final KafkaConsumer<String, String> consumer = buildConsumer();
        consumer.assign(Collections.singleton(new TopicPartition("t", 0)));
        consumer.seekToBeginning(consumer.assignment());
        final ConsumerRecords<String, String> read = consumer.poll(1000);
        if (!read.isEmpty()) {
            final ConsumerRecord<String, String> first = read.iterator().next();
            Assert.assertEquals(first.key(), "0");
        }
    }
}
{code}

> First records in batch rejected but others accepted when rolling log
> --------------------------------------------------------------------
>
>                 Key: KAFKA-5390
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5390
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.1
>            Reporter: Magnus Reftel
>         Attachments: kafka-sequence.tar.gz
>
>
> When sending a sequence of records in a batch right when the broker needs to 
> roll a new segment, it's possible for the first few records to fail, while 
> other records in the batch are accepted. If records have dependencies on 
> earlier records, e.g. in the case of a sequence of events in an event-sourced 
> system, then a producer cannot use the batching functionality, since it then 
> risks consumers receiving a record without first receiving the records it 
> depends on.
> See attached testcase (kafka-sequence.tar.gz).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to