Hy I'm new to Kafka and messaging at all. I have a simple java application that contains a consumer and a producer. It is working on the host system but if I try to run it in a docker container (Kafka is not in the container, it is still on the host) consumer.poll() hangs up and does not return. telnet tells me that inside the container the host:port 172.17.0.1:9092 is open.
In the docker container on startup Kafka tells me: Marking the coordinator ... dead for group ... Can you give me a hint, in which direction I should look? Thanks! That's the output on the host, with the working application: 15:03:42,657 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 63) ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [172.17.0.1:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = pokertracker retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 10000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka version : 0.10.0.0 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13 15:03:42,681 WARN [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527) at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44) at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98) at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81) at org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126) at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171) at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96) at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101) at org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141) at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50) at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742) at org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123) at org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158) at org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181) at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70) at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101) at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50) at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742) at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842) at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92) at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378) at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389) at org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70) at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48) at org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72) at org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121) at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159) at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70) at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101) at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50) at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742) at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842) at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92) at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378) at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389) at org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71) at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48) at org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73) at org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140) at org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39) at org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275) at org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349) at org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356) at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340) at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161) at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134) at org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88) at org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124) at org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138) at org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.jboss.threads.JBossThread.run(JBossThread.java:320) 15:03:42,815 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once. 15:03:43,221 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker. 15:03:43,223 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned partitions [] for group pokertracker 15:03:43,224 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker 15:03:43,246 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Successfully joined group pokertracker with generation 1 15:03:43,247 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions [test-0] for group pokertracker And here Is the log output from the docker container: 13:13:33,627 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 61) ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [172.17.0.1:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = pokertracker retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 10000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka version : 0.10.0.0 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13 13:13:33,711 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once. 13:13:33,948 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker. 13:13:34,009 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker Here is the java code: @Startup @ConcurrencyManagement(ConcurrencyManagementType.BEAN) @Singleton public class InMemoryCache { @Inject KafkaConsumer<String, String> consumer; @Dedicated @Inject ExecutorService kafka; ... @PostConstruct public void onInit() { ... CompletableFuture .runAsync(this::handleKafkaEvent, kafka); } public void handleKafkaEvent() { while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (ConsumerRecord<String, String> record : records) { switch (record.topic()) { case KafkaProvider.TOPIC: System.out.println("record.value() = " + record.value()); List<CoreEvent> events = converter.convertToEvents(record.value()); for (CoreEvent event : events) { handle(event); } break; default: throw new IllegalArgumentException("Illegal message type: "); } } } } ...