shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r268385422
########## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ########## @@ -628,10 +589,12 @@ public void validateStream(StreamSpec streamSpec) throws StreamValidationExcepti @Override public void deleteMessages(Map<SystemStreamPartition, String> offsets) { if (deleteCommittedMessages) { - if (adminClientForDelete == null) { - adminClientForDelete = kafka.admin.AdminClient.create(createAdminClientProperties()); - } - KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets); + Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet() + .stream() + .collect(Collectors.toMap(entry -> + new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()), + entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1))); + adminClient.deleteRecords(recordsToDelete); Review comment: 1. Please correct me if i'm wrong. KafkaAdminClient exposes [CompletableFuture-based](https://kafka.apache.org/20/javadoc/org/apache/kafka/common/KafkaFuture.html#KafkaFuture--) async api's. So we have to get the result of the delete-records-API-call and wait for request to complete. Something like ```java DeleteRecordsResult deletionRecordsResult = adminClient. deleteRecords(recordsToDelete); deletionRecordsResult.all().get(timeout, TimeUnit.MILLISECONDS); ``` We do something similar for [createTopic](https://github.com/apache/samza/blob/cb51c54fb1d1e0bf94e14cf4288f0a18b72a6bee/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java#L480) in this class. Just invoking `deleteRecords` API wouldn't suffice right? 2. For delete-msg operation, we're using the default request timeout of the admin-client (which is 120 seconds. [config-ref] (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L137), if delete-request-timeout is not defined ). However, we've use this : `KafkaSystemAdmin.KAFKA_ADMIN_OPS_TIMEOUT_MS` uniformly for all admin-client operations in this class. Would be better to stick to the same admin-timeout or override delete-timeout explicitly here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services