Hy I'm new to Kafka and messaging at all.

I have a simple java application that contains a consumer and a producer. It is 
working on the host system but if I try to run it in a docker container (Kafka 
is not in the container, it is still on the host) consumer.poll() hangs up and 
does not return.
telnet tells me that inside the container the host:port 172.17.0.1:9092 is open.

In the docker container on startup Kafka tells me: Marking the coordinator ... 
dead for group ...

Can you give me a hint, in which direction I should look?
Thanks!

That's the output on the host, with the working application:

15:03:42,657 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] 
(ServerService Thread Pool -- 63) ConsumerConfig values:
                metric.reporters = []
                metadata.max.age.ms = 300000
                partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
                reconnect.backoff.ms = 50
                sasl.kerberos.ticket.renew.window.factor = 0.8
                max.partition.fetch.bytes = 1048576
                bootstrap.servers = [172.17.0.1:9092]
                ssl.keystore.type = JKS
                enable.auto.commit = true
                sasl.mechanism = GSSAPI
                interceptor.classes = null
                exclude.internal.topics = true
                ssl.truststore.password = null
                client.id = consumer-1
                ssl.endpoint.identification.algorithm = null
                max.poll.records = 2147483647
                check.crcs = true
                request.timeout.ms = 40000
                heartbeat.interval.ms = 3000
                auto.commit.interval.ms = 5000
                receive.buffer.bytes = 65536
                ssl.truststore.type = JKS
                ssl.truststore.location = null
                ssl.keystore.password = null
                fetch.min.bytes = 1
                send.buffer.bytes = 131072
                value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
                group.id = pokertracker
                retry.backoff.ms = 100
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                ssl.trustmanager.algorithm = PKIX
                ssl.key.password = null
                fetch.max.wait.ms = 500
                sasl.kerberos.min.time.before.relogin = 60000
                connections.max.idle.ms = 540000
                session.timeout.ms = 10000
                metrics.num.samples = 2
                key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
                ssl.protocol = TLS
                ssl.provider = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.keystore.location = null
                ssl.cipher.suites = null
                security.protocol = PLAINTEXT
                ssl.keymanager.algorithm = SunX509
                metrics.sample.window.ms = 30000
                auto.offset.reset = latest

15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService 
Thread Pool -- 63) Kafka version : 0.10.0.0
15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService 
Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
15:03:42,681 WARN  [org.apache.kafka.common.utils.AppInfoParser] (ServerService 
Thread Pool -- 63) Error registering AppInfo mbean: 
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-1
                at 
com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
                at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
                at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
                at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
                at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
                at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
                at 
org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)
                at 
org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)
                at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
                at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)
                at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
                at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
                at 
com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)
                at 
com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at 
org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)
                at 
org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)
                at 
org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)
                at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
                at 
org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
                at 
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
                at 
org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)
                at 
org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
                at 
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
                at 
org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)
                at 
org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)
                at 
org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)
                at 
org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
                at 
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
                at 
org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
                at 
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
                at 
org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
                at 
org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
                at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
                at 
org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
                at 
org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)
                at 
org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
                at 
org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)
                at 
org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)
                at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
                at 
org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
                at 
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
                at 
org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
                at 
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
                at 
org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
                at 
org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
                at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
                at 
org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
                at 
org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)
                at 
org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
                at 
org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)
                at 
org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)
                at 
org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)
                at 
org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)
                at 
org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)
                at 
org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
                at 
org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
                at 
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at 
org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
                at 
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)
                at 
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)
                at 
org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)
                at 
org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)
                at 
org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)
                at 
org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)
                at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
                at org.jboss.threads.JBossThread.run(JBossThread.java:320)

15:03:42,815 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) 
Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME 
= :name allow filtering. Please note that preparing the same query more than 
once is generally an anti-pattern and will likely affect performance. Consider 
preparing the statement only once.
15:03:43,221 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
(EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 
(id: 2147483647 rack: null) for group pokertracker.
15:03:43,223 INFO  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
(EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned 
partitions [] for group pokertracker
15:03:43,224 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
(EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
15:03:43,246 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
(EE-ManagedThreadFactory-default-Thread-2) Successfully joined group 
pokertracker with generation 1
15:03:43,247 INFO  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
(EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions 
[test-0] for group pokertracker


And here Is the log output from the docker container:

13:13:33,627 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] 
(ServerService Thread Pool -- 61) ConsumerConfig values:
                metric.reporters = []
                metadata.max.age.ms = 300000
                partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
                reconnect.backoff.ms = 50
                sasl.kerberos.ticket.renew.window.factor = 0.8
                max.partition.fetch.bytes = 1048576
                bootstrap.servers = [172.17.0.1:9092]
                ssl.keystore.type = JKS
                enable.auto.commit = true
                sasl.mechanism = GSSAPI
                interceptor.classes = null
                exclude.internal.topics = true
                ssl.truststore.password = null
                client.id = consumer-1
                ssl.endpoint.identification.algorithm = null
                max.poll.records = 2147483647
                check.crcs = true
                request.timeout.ms = 40000
                heartbeat.interval.ms = 3000
                auto.commit.interval.ms = 5000
                receive.buffer.bytes = 65536
                ssl.truststore.type = JKS
                ssl.truststore.location = null
                ssl.keystore.password = null
                fetch.min.bytes = 1
                send.buffer.bytes = 131072
                value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
                group.id = pokertracker
                retry.backoff.ms = 100
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                ssl.trustmanager.algorithm = PKIX
                ssl.key.password = null
                fetch.max.wait.ms = 500
                sasl.kerberos.min.time.before.relogin = 60000
                connections.max.idle.ms = 540000
                session.timeout.ms = 10000
                metrics.num.samples = 2
                key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
                ssl.protocol = TLS
                ssl.provider = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.keystore.location = null
                ssl.cipher.suites = null
                security.protocol = PLAINTEXT
                ssl.keymanager.algorithm = SunX509
                metrics.sample.window.ms = 30000
                auto.offset.reset = latest

13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService 
Thread Pool -- 61) Kafka version : 0.10.0.0
13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService 
Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
13:13:33,711 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) 
Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME 
= :name allow filtering. Please note that preparing the same query more than 
once is generally an anti-pattern and will likely affect performance. Consider 
preparing the statement only once.
13:13:33,948 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
(EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 
(id: 2147483647 rack: null) for group pokertracker.
13:13:34,009 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
(EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 
(id: 2147483647 rack: null) dead for group pokertracker


Here is the java code:

@Startup
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
@Singleton
public class InMemoryCache {

    @Inject
    KafkaConsumer<String, String> consumer;

    @Dedicated
    @Inject
    ExecutorService kafka;

    ...

    @PostConstruct
    public void onInit() {
...
        CompletableFuture
                .runAsync(this::handleKafkaEvent, kafka);
    }

    public void handleKafkaEvent() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);
            for (ConsumerRecord<String, String> record : records) {
                switch (record.topic()) {
                    case KafkaProvider.TOPIC:
                        System.out.println("record.value() = " + 
record.value());
                        List<CoreEvent> events = 
converter.convertToEvents(record.value());
                        for (CoreEvent event : events) {
                            handle(event);
                        }
                        break;
                    default:
                        throw new IllegalArgumentException("Illegal message 
type: ");
                }
            }
        }
    }

...

Reply via email to