
Jonathan Bond commented on KAFKA-3112:

diff --git 
new file mode 100644
index 0000000..a32ff87
--- /dev/null
@@ -0,0 +1,188 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.SystemTime;
+public class BufferPoolTest {
+    public static void main(String[] args) throws InterruptedException {
+        final BufferPool bp = new BufferPool(16 * 1024 * 1024, 16384, false, 
new Metrics(), new SystemTime(),
+                "metricGroupName", Collections.emptyMap());
+        final AtomicLong allocated = new AtomicLong(0);
+        final AtomicLong bufferFull = new AtomicLong(0);
+        final long numberOfMessages = 1000000000;
+        final AtomicLong messages = new AtomicLong(numberOfMessages);
+        final int threadCount = 16;
+        final CountDownLatch latch = new CountDownLatch(threadCount + 1);
+        final Object lock = new Object();
+        final ReentrantLock rl = new ReentrantLock();
+        final LinkedBlockingQueue<ByteBuffer> queue = new 
+        final AtomicLong latency = new AtomicLong();
+        Thread reaperThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<ByteBuffer> currenlyAllocated = new 
+                while (true) {
+                    try {
+                        Thread.sleep(100);
+                        queue.drainTo(currenlyAllocated, 2048);
+                        for (ByteBuffer byteBuffer : currenlyAllocated) {
+                            bp.deallocate(byteBuffer);
+                        }
+                        // System.out.println("drained 
+                        currenlyAllocated.clear();
+                    } catch (Exception x) {
+                        System.out.println(x);
+                    }
+                }
+            }
+        });
+        reaperThread.setDaemon(true);
+        reaperThread.start();
+        for (int i = 0; i < threadCount; i++) {
+            Thread th = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    latch.countDown();
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                    while (messages.getAndDecrement() > 0) {
+                        try {
+                            // synchronized (lock) {
+                            //
+                            // }
+                            // rl.lock();
+                            //
+                            // rl.unlock();
+                            long start = System.nanoTime();
+                            try {
+                                //ByteBuffer bb = bp.allocate(1000);
+                                // Thread.yield();
+                                ByteBuffer bb = allocate(bp, 1000);
+                                if (bb != null) {
+                                    queue.offer(bb);
+                                    allocated.incrementAndGet();
+                                } else {
+                                    bufferFull.incrementAndGet();
+                                }
+                            } catch (BufferExhaustedException bee) {
+                                bufferFull.incrementAndGet();
+                            }
+                            long end = System.nanoTime();
+                            latency.addAndGet((end - start));
+                        } catch (Exception e) {
+                            System.out.println(e);
+                        }
+                    }
+                }
+            });
+            th.setPriority(Thread.MIN_PRIORITY);
+            th.setDaemon(true);
+            th.start();
+        }
+        latch.countDown();
+        long time = System.currentTimeMillis();
+        long start = time;
+        long previousF = bufferFull.get();
+        long previousA = allocated.get();
+        long nextSleep = 1000;
+        while (true) {
+            Thread.sleep(nextSleep);
+            long now = System.currentTimeMillis();
+            long fullV = bufferFull.get();
+            long allocateV = allocated.get();
+            long newVal = fullV + allocateV;
+            long latencyMs =latency.get();
+            System.out.println(now - start + "," + (double) (newVal - 
(previousF + previousA)) / (double) (now - time) * 1000
+                    + "," + (double) (allocateV) / (double) (newVal) + "," + 
fullV + "," + allocateV + ","
+                    + theGovernator.availablePermits() + "," + 
(fullV+allocateV) + "," + latencyMs
+                    / (numberOfMessages - messages.get()));
+            time = now;
+            previousF = fullV;
+            previousA = allocateV;
+//            if (now - start > 120000)
+//                break;
+            long diff = System.currentTimeMillis() - now;
+            nextSleep = 1000 - diff;
+        }
+    }
+    private static final Semaphore theGovernator = new Semaphore(256 * 1000);
+    private static ByteBuffer allocate(BufferPool bp, int size) throws 
InterruptedException {
+        ByteBuffer result;
+        final Boolean youMayPass = theGovernator.tryAcquire(size);
+        try {
+            if (youMayPass) {
+                result = bp.allocate(size);
+            } else {
+                result = null;
+            }
+        } finally {
+            if (youMayPass)
+                theGovernator.release(size);
+        }
+        return result;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..ea8efd2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java
@@ -0,0 +1,26 @@
+package org.apache.kafka.common.metrics.stats;
+import org.apache.kafka.common.metrics.MetricConfig;
+public class AvgTest {
+    public static void main(String []args) throws Exception
+    {
+        Avg a = new Avg();
+        MetricConfig cfg = new MetricConfig();
+        for(int i=0;i<15;i++)
+        {
+            a.record(cfg, i, System.currentTimeMillis());
+            System.out.println(i+" "+a.measure(cfg, 
+            Thread.sleep(1000);
+        }
+        for(int i=0;i<150;i++)
+        {
+            System.out.println(Double.isFinite(a.measure(cfg, 
+            Thread.sleep(1000);
+        }
+    }
diff --git 
index d5af354..05143ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -39,16 +39,18 @@
                     throw new ConfigException("Invalid url in " + 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
                 try {
                     InetSocketAddress address = new InetSocketAddress(host, 
-                    if (address.isUnresolved())
-                        throw new ConfigException("DNS resolution failed for 
url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
-                    addresses.add(address);
+                    if (address.isUnresolved()) {
+                        log.warn("DNS resolution failed for url in " + 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                    } else {
+                        addresses.add(address);
+                    }
                 } catch (NumberFormatException e) {
                     throw new ConfigException("Invalid port in " + 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
         if (addresses.size() < 1)
-            throw new ConfigException("No bootstrap urls given in " + 
+            throw new ConfigException("No resolvable bootstrap urls given in " 
         return addresses;

> One incorrect bootstrap server will prevent Kafka producer from opening
> -----------------------------------------------------------------------
>                 Key: KAFKA-3112
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3112
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions:
>            Reporter: Jonathan Bond
>            Priority: Critical
> If any of the servers specified in bootstrap.servers are not resolvable 
> through DNS the configuration is taken as an error, and the client won't 
> start up. We pass in 30 possible servers, and one had an issue so the client 
> wouldn't start. 
> It would be better if the client will attempt to start if there is at least 
> one server available from DNS.

This message was sent by Atlassian JIRA

Reply via email to