cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r788922915
##########
File path:
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
+ @Test
+ public void testCallbackHandlesError() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+ Time time = new MockTime();
+ ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, producerMetadata);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to
space
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
+ producerMetadata, client, null, time)) {
+ ProducerRecord<String, String> record = new
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ // Here's the important piece of the test. Let's make sure that
the RecordMetadata we get
+ // is non-null and adheres to the onCompletion contract.
+ Callback callBack = (recordMetadata, exception) -> {
+ assertNotNull(exception);
+ assertNotNull(recordMetadata);
+
+ try {
+ assertNotNull(recordMetadata.topic());
+ } catch (NullPointerException e) {
+ fail("Topic name should be valid even on send failure", e);
Review comment:
It's not necessary to do this. If you want to display a special error
message when the assert fails, there is a three-argument form which lets you
specify the error.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
+ @Test
+ public void testCallbackHandlesError() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+ Time time = new MockTime();
+ ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, producerMetadata);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to
space
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
+ producerMetadata, client, null, time)) {
+ ProducerRecord<String, String> record = new
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ // Here's the important piece of the test. Let's make sure that
the RecordMetadata we get
+ // is non-null and adheres to the onCompletion contract.
+ Callback callBack = (recordMetadata, exception) -> {
+ assertNotNull(exception);
+ assertNotNull(recordMetadata);
+
+ try {
+ assertNotNull(recordMetadata.topic());
+ } catch (NullPointerException e) {
+ fail("Topic name should be valid even on send failure", e);
+ }
+
+ assertEquals(invalidTopicName, recordMetadata.topic());
+
+ try {
+ assertEquals(RecordMetadata.UNKNOWN_PARTITION,
recordMetadata.partition());
Review comment:
As before, it's not necessary to do this. If you want to display a
special error message when the assert fails, there is a three-argument form
which lets you specify the error.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
+ @Test
+ public void testCallbackHandlesError() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+ Time time = new MockTime();
+ ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, producerMetadata);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to
space
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
+ producerMetadata, client, null, time)) {
+ ProducerRecord<String, String> record = new
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ // Here's the important piece of the test. Let's make sure that
the RecordMetadata we get
+ // is non-null and adheres to the onCompletion contract.
+ Callback callBack = (recordMetadata, exception) -> {
+ assertNotNull(exception);
+ assertNotNull(recordMetadata);
+
+ try {
+ assertNotNull(recordMetadata.topic());
+ } catch (NullPointerException e) {
+ fail("Topic name should be valid even on send failure", e);
+ }
+
+ assertEquals(invalidTopicName, recordMetadata.topic());
+
+ try {
+ assertEquals(RecordMetadata.UNKNOWN_PARTITION,
recordMetadata.partition());
Review comment:
As before, it's not necessary to do this. If you want to display a
special error message when the assert fails, there is a three-argument form
which lets you specify the error message.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
+ @Test
+ public void testCallbackHandlesError() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+ Time time = new MockTime();
+ ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, producerMetadata);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to
space
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
+ producerMetadata, client, null, time)) {
+ ProducerRecord<String, String> record = new
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ // Here's the important piece of the test. Let's make sure that
the RecordMetadata we get
+ // is non-null and adheres to the onCompletion contract.
+ Callback callBack = (recordMetadata, exception) -> {
+ assertNotNull(exception);
+ assertNotNull(recordMetadata);
+
+ try {
+ assertNotNull(recordMetadata.topic());
+ } catch (NullPointerException e) {
+ fail("Topic name should be valid even on send failure", e);
Review comment:
It's not necessary to do this. If you want to display a special error
message when the assert fails, there is a three-argument form which lets you
specify the error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]