shanthoosh commented on a change in pull request #940: SAMZA-2121: Add 
checkpoint offset field to IncomingMessageEnvelope
URL: https://github.com/apache/samza/pull/940#discussion_r263628931
 
 

 ##########
 File path: 
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
 ##########
 @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition 
systemStreamPartition, Stri
    */
   public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset,
       Object key, Object message, int size, long eventTime, long arrivalTime) {
-    this(systemStreamPartition, offset, key, message, size);
+    this(systemStreamPartition, offset, offset, key, message, size, eventTime, 
arrivalTime);
+  }
+
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components
+   * @param systemStreamPartition The aggregate object representing the 
incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which 
the message was received.
+   * @param offset The offset in the partition that the message was received 
from.
+   * @param checkpointOffset offset that can be checkpointed when this {@link 
IncomingMessageEnvelope} is processed
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   * @param eventTime the timestamp (in epochMillis) of when this event 
happened
+   * @param arrivalTime the timestamp (in epochMillis) of when this event 
arrived to (i.e., was picked-up by) Samza
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset, String checkpointOffset,
+      Object key, Object message, int size, long eventTime, long arrivalTime) {
+    this.systemStreamPartition = systemStreamPartition;
+    this.offset = offset;
+    this.checkpointOffset = checkpointOffset;
 
 Review comment:
   1. Can you please share the rationale behind adding checkpoint offset to 
`IncomingMessageEnvelope` public API. 
   2. Can you please throw some light on the other options considered here? Is 
it not plausible to do this making this change in public API. 
   
   IMHO, it'll be better not to proliferate the public API with these internal 
details which will be hard to remove later on. Anyone who reads this in the 
future, would wonder why there are two different offsets in 
`IncomingMessageEnvelope` contract. Is it possible to maintain the 
mapping(association we want for safe-checkpoint) internally within the samza 
framework side and not expose it to samza-users.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to