Hi, these are the properties: Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons properties.setProperty("zookeeper.connect", ".37:2181"); properties.setProperty("group.id", "test"); properties.setProperty("client.id", "flink_test"); properties.setProperty("auto.offset.reset", "earliest"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000");
We have tested with these as well: Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ".87:9092,.41:9092,.35:9092"); properties.setProperty("zookeeper.connect", ".37:2181"); properties.setProperty("group.id", "test"); properties.setProperty("client.id", "flink_test"); properties.setProperty("auto.offset.reset", "earliest"); and these: Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ".87:9092,.41:9092,.35:9092"); properties.setProperty("zookeeper.connect", ".37:2181"); properties.setProperty("group.id", "test"); properties.setProperty("client.id", "flink_test"); properties.setProperty("auto.offset.reset", "earliest"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); With all three different configurations we get the same result. On 19 February 2016 at 11:55, Robert Metzger <rmetz...@apache.org> wrote: > Thank you. Can you send me also the list of properties you are passing to > the kafka consumer? Are you only setting the "bootstrap.servers" or more? > > On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <javier.lo...@zalando.de> > wrote: > >> Hi Robert, >> >> Please find attached the full logs of one of our latest executions. We >> are basically trying to read from our kafka cluster and then writing the >> data to elasticsearch. >> >> Thanks for your help! >> >> On 18 February 2016 at 11:19, Robert Metzger <rmetz...@apache.org> wrote: >> >>> Hi Javier, >>> >>> sorry for the late response. In the Error Mapping of Kafka, it says that >>> code 15 means: ConsumerCoordinatorNotAvailableCode. >>> >>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala >>> >>> How many brokers did you put into the list of bootstrap servers? >>> Can you maybe send me the full log of one of the Flink TaskManagers >>> reading from Kafka? >>> >>> >>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <javier.lo...@zalando.de >>> > wrote: >>> >>>> Hi guys, >>>> >>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not >>>> been able to retrieve data from our Kafka Cluster. The DEBUG data reports >>>> the following: >>>> >>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient >>>> - Sending metadata request ClientRequest(expectResponse=true, >>>> callback=null, >>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test}, >>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient, >>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35 >>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata >>>> - Updated cluster metadata version 838 to Cluster(nodes = >>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35, >>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87, >>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic = >>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr = >>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35, >>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3, >>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,], >>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas = >>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition >>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]]) >>>> 10:53:24,398 DEBUG >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing >>>> group metadata request to broker 35 >>>> 10:53:24,432 DEBUG >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group >>>> metadata response ClientResponse(receivedTimeMs=1455702804432, >>>> disconnected=false, request=ClientRequest(expectResponse=true, >>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94, >>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test}, >>>> body={group_id=test}), createdTimeMs=1455702804398, >>>> sendTimeMs=1455702804398), >>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) >>>> >>>> >>>> We receive this message all the time. What we don't know understand is >>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}", >>>> as we see an error_code we suppose there was a problem. Our Kafka cluster >>>> works and we have some clients extracting data from it, so we don't know if >>>> this could be a Kafka issue or a Flink issue. >>>> >>>> Does anyone know, or understand, this response we are getting from >>>> Kafka? >>>> >>>> Thanks. >>>> >>> >>> >> >