Is there a JIRA for it? Could you point to where the issue exists in the
code?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On August 12, 2016 at 5:15:33 PM, Oleg Zhurakousky (
ozhurakou...@hortonworks.com) wrote:

It hangs indefinitely in any container. It’s a known issue and has been
brought up many times on this list, yet there is not fix for it.
The problem is with the fact that while poll() attempts to create an
elusion that it is async and even allows you to set a timeout it is
essentially very misleading if you look inside its implementation. The
first call it makes is to fetch topic metadata. That call is not part of
the Future it returns so if connection to broker is not available you’re
dead since Kafka attempts to reconnect and there s no property to set
reconnect attempts, so it attempts to reconnect indefinitely.

Cheers
Oleg

> On Aug 12, 2016, at 9:22 AM, Brem, Robert <robert.b...@adesso.ch> wrote:
>
> Hy I'm new to Kafka and messaging at all.
>
> I have a simple java application that contains a consumer and a producer.
It is working on the host system but if I try to run it in a docker
container (Kafka is not in the container, it is still on the host)
consumer.poll() hangs up and does not return.
> telnet tells me that inside the container the host:port 172.17.0.1:9092
is open.
>
> In the docker container on startup Kafka tells me: Marking the
coordinator ... dead for group ...
>
> Can you give me a hint, in which direction I should look?
> Thanks!
>
> That's the output on the host, with the working application:
>
> 15:03:42,657 INFO [org.apache.kafka.clients.consumer.ConsumerConfig]
(ServerService Thread Pool -- 63) ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [172.17.0.1:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = true
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = consumer-1
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> group.id = pokertracker
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 10000
> metrics.num.samples = 2
> key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = latest
>
> 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 63) Kafka version : 0.10.0.0
> 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
> 15:03:42,681 WARN [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 63) Error registering AppInfo mbean:
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-1
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)

> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)

> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)

> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)

> at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

> at
org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)

> at
org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)

> at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)

> at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)

> at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)

> at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)

> at
com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)

> at
com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)

> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

> at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

> at java.lang.reflect.Method.invoke(Method.java:498)
> at
org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)

> at
org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)

> at
org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)

> at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
> at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
> at
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)

> at
org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)

> at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
> at
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)

> at
org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)

> at
org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)

> at
org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)

> at
org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)

> at
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)

> at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
> at
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)

> at
org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)

> at
org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)

> at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
> at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
> at
org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)

> at
org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)

> at
org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)

> at
org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)

> at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
> at
org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)

> at
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)

> at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
> at
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)

> at
org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)

> at
org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)

> at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
> at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
> at
org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)

> at
org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)

> at
org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)

> at
org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)

> at
org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)

> at
org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)

> at
org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)

> at
org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
> at
org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)

> at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)

> at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)

> at
org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)

> at
org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)

> at
org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)

> at
org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)

> at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

> at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

> at java.lang.Thread.run(Thread.java:745)
> at org.jboss.threads.JBossThread.run(JBossThread.java:320)
>
> 15:03:42,815 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1)
Re-preparing already prepared query select DATA, VERSION from EVENTS where
NAME = :name allow filtering. Please note that preparing the same query
more than once is generally an anti-pattern and will likely affect
performance. Consider preparing the statement only once.
> 15:03:43,221 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator
contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
> 15:03:43,223 INFO
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned
partitions [] for group pokertracker
> 15:03:43,224 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
> 15:03:43,246 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Successfully joined group
pokertracker with generation 1
> 15:03:43,247 INFO
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned
partitions [test-0] for group pokertracker
>
>
> And here Is the log output from the docker container:
>
> 13:13:33,627 INFO [org.apache.kafka.clients.consumer.ConsumerConfig]
(ServerService Thread Pool -- 61) ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [172.17.0.1:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = true
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = consumer-1
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> group.id = pokertracker
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 10000
> metrics.num.samples = 2
> key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = latest
>
> 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 61) Kafka version : 0.10.0.0
> 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
> 13:13:33,711 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1)
Re-preparing already prepared query select DATA, VERSION from EVENTS where
NAME = :name allow filtering. Please note that preparing the same query
more than once is generally an anti-pattern and will likely affect
performance. Consider preparing the statement only once.
> 13:13:33,948 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator
contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
> 13:13:34,009 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator
contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker
>
>
> Here is the java code:
>
> @Startup
> @ConcurrencyManagement(ConcurrencyManagementType.BEAN)
> @Singleton
> public class InMemoryCache {
>
> @Inject
> KafkaConsumer<String, String> consumer;
>
> @Dedicated
> @Inject
> ExecutorService kafka;
>
> ...
>
> @PostConstruct
> public void onInit() {
> ...
> CompletableFuture
> .runAsync(this::handleKafkaEvent, kafka);
> }
>
> public void handleKafkaEvent() {
> while (true) {
> ConsumerRecords<String, String> records = consumer.poll(200);
> for (ConsumerRecord<String, String> record : records) {
> switch (record.topic()) {
> case KafkaProvider.TOPIC:
> System.out.println("record.value() = " + record.value());
> List<CoreEvent> events = converter.convertToEvents(record.value());
> for (CoreEvent event : events) {
> handle(event);
> }
> break;
> default:
> throw new IllegalArgumentException("Illegal message type: ");
> }
> }
> }
> }
>
> ...
>

Reply via email to