shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523873
##########
File path:
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
##########
@@ -60,16 +74,22 @@
private final Config config;
private final boolean fetchThresholdBytesEnabled;
private final KafkaSystemConsumerMetrics metrics;
+ private final KafkaStartpointRegistrationHandler
kafkaStartpointRegistrationHandler;
// This sink is used to transfer the messages from the proxy/consumer to the
BlockingEnvelopeMap.
final KafkaConsumerMessageSink messageSink;
// This proxy contains a separate thread, which reads kafka messages (with
consumer.poll()) and populates
// BlockingEnvelopMap's buffers.
- final private KafkaConsumerProxy proxy;
+ @VisibleForTesting
+ KafkaConsumerProxy proxy;
// keep registration data until the start - mapping between registered SSPs
and topicPartitions, and their offsets
final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
Review comment:
I'm aware that we'd to do this change. My initial plan was to do it after
migrating all the system consumer to implement at-least the startpoint-specific
visitor.
Discussed offline and we agreed to do this change in this patch itself.
1. Removed `topicPartitionstoOffset` and delegate `register(ssp, offset)`
API calls to start-point registration API.
2. Moved the offset comparator logic to `SystemConsumers` layer.
A. Only few implementations of SystemConsumer.register API
currently use systemAdmin.offsetComparator to compare offsets. However, the
comparator logic is common to all implementations of SystemConsumer.register
API. Moving it to the upper-layer(`SystemConsumers`) would ensure functional
correctness.
B. All other upper-layers except `SystemConsumers` that invoke
register(ssp, offset) API already pass the lowest offset for a SSP. We don't
have startpoint comparator abstraction yet. To delegate calls from
register(ssp, offset) to register-startpoint API we had to get the lowest
offset for a SSP from SystemConsumers layer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services