I have published variation of this question before regarding 'old' publisher. I 
figured out how to use 'new' redesigned publisher and I have similar question. 
Basically, I want

#1) Detect when broker is down when using asynchronous publisher. If broker is 
down, I want to generate alert and stop publication.

#2) Most of all, I want to avoid message gaps when broker recovers - in other 
words (I do not want to loose messages when broker is down and silently resume 
publication when broker is up again).

Apparently, this is not possible with old publisher when asynchronous mode mode 
is used.


My test program is below. I do not see the way to detect when broker is down. I 
see errors published in the log, but my publisher keeps publishing without 
reporting any errors via the API. Once broker connection is up again, it will 
report exceptions for the messages that it failed to publish in the Callback. 
However, I suspect it would also publish new messages at the point when 
exception is reported, creating a 'gap'.


I there a way to address these concerns?


I would hate to use synch publisher - my publishing rate drops dramatically. 
From 300,000 m/s to 7,000 m/s.


package com.kcg.kafka.test;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestProducer {
    public static void main(String[] args) {
        System.setProperty("log4j.debug", "true");
        long events = Long.parseLong(args[0]);

        Properties props = new Properties();

        
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"cno-d-igoberman2:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "1");

        KafkaProducer<String,String> producer = new 
KafkaProducer<String,String>(props);

        long start = System.currentTimeMillis();

        long nEvents = 0;
        for (; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            final String key = Long.toString(nEvents);
            String msg = runtime + ": " + key + new String(new byte[300]);
            ProducerRecord<String,String> producerRecord = new 
ProducerRecord<String,String>("test", key, msg);

            try {
                System.out.println("Sending " + key);

                // 'send' will not throw an exception when broker is down.
                producer.send(producerRecord,
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, 
Exception e) {
                                if(e != null)

                                {
                                    e.printStackTrace();

                                   // Stop publisher. But is there guarantee 
that no more messages are published after this one?

                                }
                                else
                                    System.out.println("The offset of the 
record we just sent is: " + metadata.offset() + " " + key);
                            }
                });//.get(20, TimeUnit.SECONDS);
            }
            catch (Throwable e) {
                e.printStackTrace();
            }

            if (nEvents % 10000 == 0) {
                System.out.println("" + key);
            }

            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
            }
        }

        long duration = (System.currentTimeMillis() - start);
        System.out.println("Published " + nEvents + " messages in " + duration 
+ "ms. " + (int)((double)nEvents/((double)duration / 1000.0)) + " m/s.");

        producer.close();
    }
}



log4j: Trying to find [log4j.xml] using context classloader 
sun.misc.Launcher$AppClassLoader@6da21389.
log4j: Trying to find [log4j.xml] using 
sun.misc.Launcher$AppClassLoader@6da21389 class loader.
log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
log4j: Trying to find [log4j.properties] using context classloader 
sun.misc.Launcher$AppClassLoader@6da21389.
log4j: Using URL 
[file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties] for automatic 
log4j configuration.
log4j: Reading configuration from URL 
file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties
log4j: Parsing for [root] with value=[INFO, stdout].
log4j: Level token is [INFO].
log4j: Category root set to INFO
log4j: Parsing appender named "stdout".
log4j: Parsing layout options for "stdout".
log4j: Setting property [conversionPattern] to [%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n].
log4j: End of parsing for "stdout".
log4j: Setting property [target] to [System.out].
log4j: Parsed "stdout" options.
log4j: Finished configuring.
2015-11-10 10:23:16 INFO  ProducerConfig:113 - ProducerConfig values:
    value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
    key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
    block.on.buffer.full = true
    retry.backoff.ms = 100
    buffer.memory = 33554432
    batch.size = 16384
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    receive.buffer.bytes = 32768
    timeout.ms = 30000
    max.in.flight.requests.per.connection = 5
    bootstrap.servers = [cno-d-igoberman2:9092]
    metric.reporters = []
    client.id =
    compression.type = none
    retries = 0
    max.request.size = 1048576
    send.buffer.bytes = 131072
    acks = 1
    reconnect.backoff.ms = 10
    linger.ms = 0
    metrics.num.samples = 2
    metadata.fetch.timeout.ms = 60000

Sending 0
0
The offset of the record we just sent is: 12 0
Sending 1
The offset of the record we just sent is: 13 1
Sending 2
The offset of the record we just sent is: 14 2
Sending 3
The offset of the record we just sent is: 15 3
Sending 4
The offset of the record we just sent is: 16 4
Sending 5
The offset of the record we just sent is: 17 5
Sending 6
The offset of the record we just sent is: 18 6
Sending 7
The offset of the record we just sent is: 19 7
Sending 8
The offset of the record we just sent is: 20 8
Sending 9
The offset of the record we just sent is: 21 9
2015-11-10 10:23:25 WARN  Selector:276 - Error in I/O with 
cno-d-igoberman2/10.83.55.13
java.io.EOFException
    at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)

....

Sending 10


This e-mail and its attachments are intended only for the individual or entity 
to whom it is addressed and may contain information that is confidential, 
privileged, inside information, or subject to other restrictions on use or 
disclosure. Any unauthorized use, dissemination or copying of this transmission 
or the information in it is prohibited and may be unlawful. If you have 
received this transmission in error, please notify the sender immediately by 
return e-mail, and permanently delete or destroy this e-mail, any attachments, 
and all copies (digital or paper). Unless expressly stated in this e-mail, 
nothing in this message should be construed as a digital or electronic 
signature. For additional important disclaimers and disclosures regarding KCG's 
products and services, please click on the following link:

http://www.kcg.com/legal/global-disclosures

Reply via email to