Kashyap Ivaturi created KAFKA-7543: -------------------------------------- Summary: Kafka Connect JDBC Sink failing to establish connection to Topic, while the connection is working fine with standalone consumer Key: KAFKA-7543 URL: https://issues.apache.org/jira/browse/KAFKA-7543 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Kashyap Ivaturi
Hi, I'am trying to build Kafka Connect JDBC Sink Connector to have my DB updated with the data I get in Kafka Topic. I had implemented JDBC Source Connectors before which worked very well but in this case when I try to run the Sink Connector its internally failing to connect to the Topic and disconnecting from the Kafka broker and this is happening in loop. When I have enabled TRACE I got below details in the log. Any idea why the consumer is unable to connect to the Topic?. Actually when I have used a standalone consumer from my another application it worked pretty well in connecting to the Topic and reading messages from it. Please let me know if you have any suggestions. [2018-10-24 23:03:24,134] INFO WorkerSinkTask\{id=hrmsAckEvents-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:268) [2018-10-24 23:03:24,135] TRACE WorkerSinkTask\{id=hrmsAckEvents-0} Polling consumer with timeout 4875 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:282) [2018-10-24 23:03:24,136] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,136] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Sending GroupCoordinator request to broker messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:183) [2018-10-24 23:03:24,281] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Initiating connection to node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,293] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,295] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,346] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,365] DEBUG Added sensor with name node--1.bytes-sent (org.apache.kafka.common.metrics.Metrics:404) [2018-10-24 23:03:24,367] DEBUG Added sensor with name node--1.bytes-received (org.apache.kafka.common.metrics.Metrics:404) [2018-10-24 23:03:24,374] DEBUG Added sensor with name node--1.latency (org.apache.kafka.common.metrics.Metrics:404) [2018-10-24 23:03:24,376] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.network.Selector:195) [2018-10-24 23:03:24,377] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Completed connection to node -1. Fetching API versions. (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,377] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Initiating API versions fetch from node -1. (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,378] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] No version information found when sending API_VERSIONS with correlation id 1 to node -1. Assuming version 1. (org.apache.kafka.clients.NetworkClient:135) [2018-10-24 23:03:24,380] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Sending API_VERSIONS {} with correlation id 1 to node -1 (org.apache.kafka.clients.NetworkClient:135) [2018-10-24 23:03:24,385] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,389] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,724] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,725] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Connection with messaging-rtp3.cisco.com/64.101.96.6 disconnected (org.apache.kafka.common.network.Selector:189) java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:231) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:192) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:528) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:469) at org.apache.kafka.common.network.Selector.poll(Selector.java:398) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:410) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:283) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-10-24 23:03:24,729] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,732] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Cancelled request {} with correlation id 1 due to node -1 being disconnected (org.apache.kafka.clients.NetworkClient:135) [2018-10-24 23:03:24,733] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Cancelled FIND_COORDINATOR request RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=0) with correlation id 0 due to node -1 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:195) [2018-10-24 23:03:24,734] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Coordinator discovery failed, refreshing metadata (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:177) [2018-10-24 23:03:24,734] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Removing node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 (org.apache.kafka.clients.NetworkClient:135) -- This message was sent by Atlassian JIRA (v7.6.3#76005)