This sounds related to https://issues.apache.org/jira/browse/KAFKA-1894 Currently the consumer won't timeout when it cannot contact the brokers.
-Ewen On Wed, Dec 14, 2016 at 5:21 AM, Costache, Vlad < vlad.costa...@metrosystems.net> wrote: > Hello, > > > > We are trying to make a consumer for kafka, (client code alone and camel > integrated) and we ended in a blocking point. > > Can you please give us an advice, or any other idea? > > > > Our problem: > > - We create a kafka consumer that connects to a wrong server > (wrong ip/port), and the consumer get stuck in “poll” method even the > connection is not created. > > - We tried also with camel, but the same problem (as the same > kafka client is called) > > - It seems that there is a bug in kafka java client that the > connection ends up in a loop > > - For producer everything is fine. > > > > Do you have any advice, or can you confirm that there is a bug? Do you > plan to fix this? Our production code will be with camel, so there will be > needed an exception to be thrown so we can make our data error handling. > > > > *How can we make the consumer throw exception if the connection to server > is not created successfully or the connection is lost at some point?* > > > > > > Our environment: > > Standalone: > > <dependencies> > > <dependency> > > <groupId>org.apache.kafka</groupId> > > <artifactId>kafka_2.11</artifactId> > > <version>0.10.0.0</version> > > </dependency> > > > > With camel: > > <dependency> > > <groupId>org.apache.camel</groupId> > > <artifactId>camel-*kafka*</artifactId> > > <version>2.17.0.redhat-630187</version> > > </dependency> > > > > > > The dummy code example is attached. > > > > Or camel: > > > > //server do not exist > > from("kafka:10.97.210.222:8093?topic=testTopic_ > mip133&groupId=testing&autoOffsetReset=earliest&consumersCount=1" > ).process(*new* Processor( > > ) { > > > > @Override > > *public* *void* process(Exchange exchange) *throws* Exception > { > > *LOG*.info(*new* MsbHeaderImpl(), "Am primit header: " + > exchange.getIn().getHeaders()); > > *LOG*.info(*new* MsbHeaderImpl(), "Am primit body: " + > exchange.getIn().getBody()); > > > > } > > }); > > > > > > Kafka server version: kafka_2.10-0.10.1.0 > > > > > > Thanks, > > Vlad > > Geschaeftsanschrift/Business Address: METRO SYSTEMS GmbH, Metro-Strasse > 12, 40235 Duesseldorf, Germany > Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman) > Geschaeftsfuehrung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), > Wim van Herwijnen > Sitz Duesseldorf, Amtsgericht Duesseldorf, HRB 18232/Registered Office > Duesseldorf, Commercial Register of the Duesseldorf Local Court, HRB 18232 > > --- > Betreffend Mails von *@metrosystems.net > > Die in dieser E-Mail enthaltenen Nachrichten und Anhaenge sind > ausschliesslich fuer den bezeichneten Adressaten bestimmt. > Sie koennen rechtlich geschuetzte, vertrauliche Informationen enthalten. > Falls Sie nicht der bezeichnete Empfaenger oder zum Empfang dieser E-Mail > nicht berechtigt sind, ist die Verwendung, Vervielfaeltigung oder > Weitergabe der Nachrichten und Anhaenge untersagt. Falls Sie diese E-Mail > irrtuemlich erhalten haben, informieren Sie bitte unverzueglich den > Absender und vernichten Sie die E-Mail. > > Regarding mails from *@metrosystems.net > > This e-mail message and any attachment are intended exclusively for the > named addressee. > They may contain confidential information which may also be protected by > professional secrecy. Unless you are the named addressee (or authorised to > receive for the addressee) you may not copy or use this message or any > attachment or disclose the contents to anyone else. If this e-mail was sent > to you by mistake please notify the sender immediately and delete this > e-mail. >