[ https://issues.apache.org/jira/browse/KAFKA-3112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15103456#comment-15103456 ]
Jonathan Bond commented on KAFKA-3112: -------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java new file mode 100644 index 0000000..a32ff87 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.SystemTime; + +public class BufferPoolTest { + + public static void main(String[] args) throws InterruptedException { + + final BufferPool bp = new BufferPool(16 * 1024 * 1024, 16384, false, new Metrics(), new SystemTime(), + "metricGroupName", Collections.emptyMap()); + final AtomicLong allocated = new AtomicLong(0); + final AtomicLong bufferFull = new AtomicLong(0); + final long numberOfMessages = 1000000000; + final AtomicLong messages = new AtomicLong(numberOfMessages); + final int threadCount = 16; + final CountDownLatch latch = new CountDownLatch(threadCount + 1); + final Object lock = new Object(); + final ReentrantLock rl = new ReentrantLock(); + final LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>(); + final AtomicLong latency = new AtomicLong(); + + Thread reaperThread = new Thread(new Runnable() { + + @Override + public void run() { + List<ByteBuffer> currenlyAllocated = new ArrayList<ByteBuffer>(2048); + while (true) { + + try { + Thread.sleep(100); + queue.drainTo(currenlyAllocated, 2048); + + for (ByteBuffer byteBuffer : currenlyAllocated) { + bp.deallocate(byteBuffer); + } + // System.out.println("drained "+currenlyAllocated.size()); + currenlyAllocated.clear(); + } catch (Exception x) { + System.out.println(x); + } + } + + } + }); + reaperThread.setDaemon(true); + reaperThread.start(); + for (int i = 0; i < threadCount; i++) { + Thread th = new Thread(new Runnable() { + + @Override + public void run() { + latch.countDown(); + try { + + latch.await(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + while (messages.getAndDecrement() > 0) { + try { + // synchronized (lock) { + // + // } + + // rl.lock(); + // + // rl.unlock(); + long start = System.nanoTime(); + try { + + //ByteBuffer bb = bp.allocate(1000); + // Thread.yield(); + ByteBuffer bb = allocate(bp, 1000); + + if (bb != null) { + queue.offer(bb); + allocated.incrementAndGet(); + } else { + bufferFull.incrementAndGet(); + } + } catch (BufferExhaustedException bee) { + bufferFull.incrementAndGet(); + } + + long end = System.nanoTime(); + latency.addAndGet((end - start)); + + } catch (Exception e) { + System.out.println(e); + } + + } + } + }); + th.setPriority(Thread.MIN_PRIORITY); + th.setDaemon(true); + th.start(); + + } + latch.countDown(); + + long time = System.currentTimeMillis(); + long start = time; + long previousF = bufferFull.get(); + long previousA = allocated.get(); + long nextSleep = 1000; + + while (true) { + + Thread.sleep(nextSleep); + long now = System.currentTimeMillis(); + long fullV = bufferFull.get(); + long allocateV = allocated.get(); + long newVal = fullV + allocateV; + + long latencyMs =latency.get(); + + System.out.println(now - start + "," + (double) (newVal - (previousF + previousA)) / (double) (now - time) * 1000 + + "," + (double) (allocateV) / (double) (newVal) + "," + fullV + "," + allocateV + "," + + theGovernator.availablePermits() + "," + (fullV+allocateV) + "," + latencyMs + / (numberOfMessages - messages.get())); + + time = now; + previousF = fullV; + previousA = allocateV; + +// if (now - start > 120000) +// break; + + long diff = System.currentTimeMillis() - now; + nextSleep = 1000 - diff; + } + + } + + private static final Semaphore theGovernator = new Semaphore(256 * 1000); + + private static ByteBuffer allocate(BufferPool bp, int size) throws InterruptedException { + ByteBuffer result; + + final Boolean youMayPass = theGovernator.tryAcquire(size); + try { + if (youMayPass) { + result = bp.allocate(size); + } else { + result = null; + } + } finally { + if (youMayPass) + theGovernator.release(size); + } + + return result; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java new file mode 100644 index 0000000..ea8efd2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java @@ -0,0 +1,26 @@ +package org.apache.kafka.common.metrics.stats; + +import org.apache.kafka.common.metrics.MetricConfig; + +public class AvgTest { + + public static void main(String []args) throws Exception + { + Avg a = new Avg(); + MetricConfig cfg = new MetricConfig(); + for(int i=0;i<15;i++) + { + a.record(cfg, i, System.currentTimeMillis()); + System.out.println(i+" "+a.measure(cfg, System.currentTimeMillis())); + Thread.sleep(1000); + } + + for(int i=0;i<150;i++) + { + + System.out.println(Double.isFinite(a.measure(cfg, System.currentTimeMillis()))); + Thread.sleep(1000); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java index d5af354..05143ac 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java @@ -39,16 +39,18 @@ throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); try { InetSocketAddress address = new InetSocketAddress(host, port); - if (address.isUnresolved()) - throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - addresses.add(address); + if (address.isUnresolved()) { + log.warn("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + } else { + addresses.add(address); + } } catch (NumberFormatException e) { throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); } } } if (addresses.size() < 1) - throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + throw new ConfigException("No resolvable bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); return addresses; } > One incorrect bootstrap server will prevent Kafka producer from opening > ----------------------------------------------------------------------- > > Key: KAFKA-3112 > URL: https://issues.apache.org/jira/browse/KAFKA-3112 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.8.2.1 > Reporter: Jonathan Bond > Priority: Critical > > If any of the servers specified in bootstrap.servers are not resolvable > through DNS the configuration is taken as an error, and the client won't > start up. We pass in 30 possible servers, and one had an issue so the client > wouldn't start. > It would be better if the client will attempt to start if there is at least > one server available from DNS. -- This message was sent by Atlassian JIRA (v6.3.4#6332)