It is accurate since it’s an API/implementation problem and therefore container independent. Sure if everything is configured correctly and broker is accessible then things do work, but try to shut down consumer when broker is not accessible. And when I mean shut down I am not implying shutting down the JVM via System.exit(), I am simply saying that poll() will block indefinitely and so will close().
In fact it’s a very common problem in Kafka components (see below) https://issues.apache.org/jira/browse/KAFKA-3540 https://issues.apache.org/jira/browse/KAFKA-1894 https://issues.apache.org/jira/browse/KAFKA-3539 Cheers Oleg On Aug 16, 2016, at 5:06 AM, Jaikiran Pai <jai.forums2...@gmail.com<mailto:jai.forums2...@gmail.com>> wrote: On Friday 12 August 2016 08:45 PM, Oleg Zhurakousky wrote: It hangs indefinitely in any container. I don't think that's accurate. We have been running Kafka brokers and consumers/producers in docker containers for a while now and they are functional. Of course, you need to make sure you use the IP addresses instead of localhost/127.0.0.1 to make sure that the brokers are accessible to the consumers/producers and you don't run into the situation that you explain about the broker connection not happening successfully. By the way, I am not saying that the consumer.poll() doesn't have that issue you state. -Jaikiran It’s a known issue and has been brought up many times on this list, yet there is not fix for it. The problem is with the fact that while poll() attempts to create an elusion that it is async and even allows you to set a timeout it is essentially very misleading if you look inside its implementation. The first call it makes is to fetch topic metadata. That call is not part of the Future it returns so if connection to broker is not available you’re dead since Kafka attempts to reconnect and there s no property to set reconnect attempts, so it attempts to reconnect indefinitely. Cheers Oleg On Aug 12, 2016, at 9:22 AM, Brem, Robert <robert.b...@adesso.ch<mailto:robert.b...@adesso.ch>> wrote: Hy I'm new to Kafka and messaging at all. I have a simple java application that contains a consumer and a producer. It is working on the host system but if I try to run it in a docker container (Kafka is not in the container, it is still on the host) consumer.poll() hangs up and does not return. telnet tells me that inside the container the host:port 172.17.0.1:9092 is open. In the docker container on startup Kafka tells me: Marking the coordinator ... dead for group ... Can you give me a hint, in which direction I should look? Thanks! That's the output on the host, with the working application: 15:03:42,657 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 63) ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [172.17.0.1:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = pokertracker retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 10000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka version : 0.10.0.0 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13 15:03:42,681 WARN [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527) at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44) at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98) at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81) at org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126) at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171) at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96) at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101) at org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141) at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50) at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742) at org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123) at org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158) at org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181) at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70) at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101) at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50) at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742) at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842) at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92) at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378) at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389) at org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70) at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48) at org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72) at org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121) at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159) at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70) at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101) at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50) at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742) at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842) at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92) at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378) at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389) at org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71) at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48) at org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73) at org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140) at org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39) at org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275) at org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349) at org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356) at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161) at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134) at org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88) at org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124) at org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138) at org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.jboss.threads.JBossThread.run(JBossThread.java:320) 15:03:42,815 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once. 15:03:43,221 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker. 15:03:43,223 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned partitions [] for group pokertracker 15:03:43,224 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker 15:03:43,246 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Successfully joined group pokertracker with generation 1 15:03:43,247 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions [test-0] for group pokertracker And here Is the log output from the docker container: 13:13:33,627 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 61) ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [172.17.0.1:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = pokertracker retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 10000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka version : 0.10.0.0 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13 13:13:33,711 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once. 13:13:33,948 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker. 13:13:34,009 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker Here is the java code: @Startup @ConcurrencyManagement(ConcurrencyManagementType.BEAN) @Singleton public class InMemoryCache { @Inject KafkaConsumer<String, String> consumer; @Dedicated @Inject ExecutorService kafka; ... @PostConstruct public void onInit() { ... CompletableFuture .runAsync(this::handleKafkaEvent, kafka); } public void handleKafkaEvent() { while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (ConsumerRecord<String, String> record : records) { switch (record.topic()) { case KafkaProvider.TOPIC: System.out.println("record.value() = " + record.value()); List<CoreEvent> events = converter.convertToEvents(record.value()); for (CoreEvent event : events) { handle(event); } break; default: throw new IllegalArgumentException("Illegal message type: "); } } } } ...