[
https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14357366#comment-14357366
]
Guozhang Wang commented on KAFKA-1910:
--------------------------------------
Got some problems with RB, uploading the patch here for a quick review:
{code}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e972efb..436f9b2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -129,7 +129,7 @@ public final class Coordinator {
// process the response
JoinGroupResponse response = new
JoinGroupResponse(resp.responseBody());
- // TODO: needs to handle disconnects and errors
+ // TODO: needs to handle disconnects and errors, should not just throw
exceptions
Errors.forCode(response.errorCode()).maybeThrow();
this.consumerId = response.consumerId();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 27c78b8..8b71fba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -231,11 +231,12 @@ public class Fetcher<K, V> {
log.debug("Fetched offset {} for partition {}",
offset, topicPartition);
return offset;
} else if (errorCode ==
Errors.NOT_LEADER_FOR_PARTITION.code()
- || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+ || errorCode ==
Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.warn("Attempt to fetch offsets for partition {}
failed due to obsolete leadership information, retrying.",
topicPartition);
awaitMetadataUpdate();
} else {
+ // TODO: we should not just throw exceptions but
should handle and log it.
Errors.forCode(errorCode).maybeThrow();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index af704f3..f706086 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -45,7 +45,9 @@ public class ListOffsetResponse extends
AbstractRequestResponse {
/**
* Possible error code:
*
- * TODO
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * NOT_LEADER_FOR_PARTITION (6)
+ * UNKNOWN (-1)
*/
private static final String OFFSETS_KEY_NAME = "offsets";
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index fed37e3..8eae1ab 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -260,8 +260,10 @@ class ConsumerTest extends IntegrationTestHarness with
Logging {
var iter: Int = 0
override def doWork(): Unit = {
- killRandomBroker()
+ info("Killed broker %d".format(killRandomBroker()))
+ Thread.sleep(500)
restartDeadBrokers()
+ info("Restarted all brokers")
iter += 1
if (iter == numIters)
{code}
> Refactor KafkaConsumer
> ----------------------
>
> Key: KAFKA-1910
> URL: https://issues.apache.org/jira/browse/KAFKA-1910
> Project: Kafka
> Issue Type: Sub-task
> Components: consumer
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
> Attachments: KAFKA-1910.patch, KAFKA-1910_2015-03-05_14:55:33.patch
>
>
> KafkaConsumer now contains all the logic on the consumer side, making it a
> very huge class file, better re-factoring it to have multiple layers on top
> of KafkaClient.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)