Hi Robert, After we restarted our Kafka / Zookeeper cluster the consumer worked. Some of our topics had some problems. The flink's consumer for Kafka 0.9 works as expected.
Thanks! On 19 February 2016 at 12:03, Lopez, Javier <javier.lo...@zalando.de> wrote: > Hi, these are the properties: > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons > properties.setProperty("zookeeper.connect", ".37:2181"); > properties.setProperty("group.id", "test"); > properties.setProperty("client.id", "flink_test"); > properties.setProperty("auto.offset.reset", "earliest"); > properties.put("enable.auto.commit", "true"); > properties.put("auto.commit.interval.ms", "1000"); > properties.put("session.timeout.ms", "30000"); > > We have tested with these as well: > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > ".87:9092,.41:9092,.35:9092"); > properties.setProperty("zookeeper.connect", ".37:2181"); > properties.setProperty("group.id", "test"); > properties.setProperty("client.id", "flink_test"); > properties.setProperty("auto.offset.reset", "earliest"); > > > and these: > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > ".87:9092,.41:9092,.35:9092"); > properties.setProperty("zookeeper.connect", ".37:2181"); > properties.setProperty("group.id", "test"); > properties.setProperty("client.id", "flink_test"); > properties.setProperty("auto.offset.reset", "earliest"); > properties.put("enable.auto.commit", "true"); > properties.put("auto.commit.interval.ms", "1000"); > properties.put("session.timeout.ms", "30000"); > properties.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > properties.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > With all three different configurations we get the same result. > > On 19 February 2016 at 11:55, Robert Metzger <rmetz...@apache.org> wrote: > >> Thank you. Can you send me also the list of properties you are passing to >> the kafka consumer? Are you only setting the "bootstrap.servers" or more? >> >> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <javier.lo...@zalando.de> >> wrote: >> >>> Hi Robert, >>> >>> Please find attached the full logs of one of our latest executions. We >>> are basically trying to read from our kafka cluster and then writing the >>> data to elasticsearch. >>> >>> Thanks for your help! >>> >>> On 18 February 2016 at 11:19, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Javier, >>>> >>>> sorry for the late response. In the Error Mapping of Kafka, it says >>>> that code 15 means: ConsumerCoordinatorNotAvailableCode. >>>> >>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala >>>> >>>> How many brokers did you put into the list of bootstrap servers? >>>> Can you maybe send me the full log of one of the Flink TaskManagers >>>> reading from Kafka? >>>> >>>> >>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier < >>>> javier.lo...@zalando.de> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have >>>>> not been able to retrieve data from our Kafka Cluster. The DEBUG data >>>>> reports the following: >>>>> >>>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient >>>>> - Sending metadata request ClientRequest(expectResponse=true, >>>>> callback=null, >>>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test}, >>>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient, >>>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35 >>>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata >>>>> - Updated cluster metadata version 838 to Cluster(nodes = >>>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35, >>>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87, >>>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic >>>>> = >>>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr = >>>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35, >>>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = >>>>> stream_test_3, >>>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,], >>>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas = >>>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition >>>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]]) >>>>> 10:53:24,398 DEBUG >>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing >>>>> group metadata request to broker 35 >>>>> 10:53:24,432 DEBUG >>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group >>>>> metadata response ClientResponse(receivedTimeMs=1455702804432, >>>>> disconnected=false, request=ClientRequest(expectResponse=true, >>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94, >>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test}, >>>>> body={group_id=test}), createdTimeMs=1455702804398, >>>>> sendTimeMs=1455702804398), >>>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) >>>>> >>>>> >>>>> We receive this message all the time. What we don't know understand is >>>>> this >>>>> "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}", >>>>> as we see an error_code we suppose there was a problem. Our Kafka cluster >>>>> works and we have some clients extracting data from it, so we don't know >>>>> if >>>>> this could be a Kafka issue or a Flink issue. >>>>> >>>>> Does anyone know, or understand, this response we are getting from >>>>> Kafka? >>>>> >>>>> Thanks. >>>>> >>>> >>>> >>> >> >