shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to 
Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r268385422
 
 

 ##########
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##########
 @@ -628,10 +589,12 @@ public void validateStream(StreamSpec streamSpec) throws 
StreamValidationExcepti
   @Override
   public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
     if (deleteCommittedMessages) {
-      if (adminClientForDelete == null) {
-        adminClientForDelete = 
kafka.admin.AdminClient.create(createAdminClientProperties());
-      }
-      KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets);
+      Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
+          .stream()
+          .collect(Collectors.toMap(entry ->
+              new TopicPartition(entry.getKey().getStream(), 
entry.getKey().getPartition().getPartitionId()),
+              entry -> 
RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
+      adminClient.deleteRecords(recordsToDelete);
 
 Review comment:
   1. Please correct me if i'm wrong. 
   KafkaAdminClient exposes 
[CompletableFuture-based](https://kafka.apache.org/20/javadoc/org/apache/kafka/common/KafkaFuture.html#KafkaFuture--)
 async api's. So we have to get the result of the delete-records-API-call and 
wait for request to complete. Something like
   
   ```java
   DeleteRecordsResult deletionRecordsResult = adminClient. 
deleteRecords(recordsToDelete);
   deletionRecordsResult.all().get(timeout, TimeUnit.MILLISECONDS);
   ```
   We do something similar for 
[createTopic](https://github.com/apache/samza/blob/cb51c54fb1d1e0bf94e14cf4288f0a18b72a6bee/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java#L480)
 in this class. Just invoking `deleteRecords` API wouldn't suffice right?
   
   2. For delete-msg operation, we're using the default request timeout of the 
admin-client (which is 120 seconds. [config-ref] 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L137),
 if delete-request-timeout is not defined ). However, we've use this : 
`KafkaSystemAdmin.KAFKA_ADMIN_OPS_TIMEOUT_MS` uniformly for all admin-client 
operations in this class. Would be better to stick to the same admin-timeout or 
override delete-timeout explicitly here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to