Shekharrajak commented on issue #18439:
URL: https://github.com/apache/druid/issues/18439#issuecomment-4188637372

   To make sure there is no data loss : 
   
   ```
   Phase 1 — POLL
     records = consumer.poll(pollTimeout)
     Broker: acquisition lock set on each record (TTL = lock.duration.ms)
     Local:  recordsInFlight = new LinkedHashMap<>()  // 
(topic,partition,offset) -> record
     RENEW thread: started (see Section 10)
   
   Phase 2 — PARSE
     for each record r:
       try:
         row = inputRowParser.parse(r)
         pendingRows.add(row)
         pendingAccept.add(r)
       catch ParseException:
         parseExceptionCount++
         if parseExceptionCount > maxParseExceptions:
           // Too many bad records — operator must fix schema. Fail the task.
           // RELEASE all records: broker redelivers after lock expiry.
           // (Do not REJECT on task failure — preserve redelivery for 
non-parse records.)
           releaseAllAndFail()
         pendingReject.add(r)   // REJECT: permanent schema error, no value in 
retrying
   
   Phase 3 — INGEST
     appenderator.add(pendingRows)   // write to JVM heap buffer
   
   Phase 4 — PERSIST  (local disk — crash-safe boundary 1)
     if rowCount >= maxRowsInMemory OR elapsed >= intermediatePersistPeriod:
       try:
         appenderator.persist()      // flush to local disk; survives JVM crash
       catch IOException:
         pendingRelease.addAll(pendingAccept)
         pendingAccept.clear()
         commitAcknowledgements()    // RELEASE — broker redelivers; no data 
loss
         continue                    // do not fail task; retry on next cycle
   
   Phase 5 — PUSH  (deep storage — crash-safe boundary 2)
     try:
       pushedSegments = driver.push(appenderator.getSegments())
     catch Exception:
       pendingRelease.addAll(pendingAccept)
       pendingAccept.clear()
       commitAcknowledgements()      // RELEASE — broker redelivers
       if retryCount < maxPushRetries: retry else: fail task
   
   Phase 6 — WRITE PENDING-ACK INTENT  (WAL for two-phase ACK)
     INSERT INTO kafka_share_pending_acks
       (supervisor_id, task_id, topic, partition, offsets_blob,
        segment_ids, state = 'SEGMENTS_PUSHED_PENDING_REGISTER')
   
   Phase 7 — REGISTER  (metadata store — point of no return)
     try:
       coordinator.announceHistoricalSegments(pushedSegments)
     catch Exception:
       pendingRelease.addAll(pendingAccept)
       commitAcknowledgements()      // RELEASE — segments in deep storage, not 
queryable
       // Orphaned segment cleanup: coordinator background thread removes 
unregistered segs
       return                        // retry registration or fail task
   
     UPDATE kafka_share_pending_acks SET state = 
'SEGMENTS_REGISTERED_PENDING_ACK'
   
   Phase 8 — ACK  (after registration — safe to tell broker "done")
     for r in pendingAccept:  consumer.acknowledge(r, AcknowledgeType.ACCEPT)
     for r in pendingReject:  consumer.acknowledge(r, AcknowledgeType.REJECT)
     for r in pendingRelease: consumer.acknowledge(r, AcknowledgeType.RELEASE)
   
     Map<TopicIdPartition, Optional<KafkaException>> results =
         consumer.commitSync(Duration.ofMillis(ackTimeoutMs))
   
     for each (partition, maybeException) in results:
       if maybeException.isPresent():
         // Segments for this partition ARE registered and queryable.
         // Records will be redelivered. Duplicates possible; no data loss.
         log.warn("ACK failed for {}, records will be redelivered", partition)
         emitMetric("ingest/share/ackFailure", 1)
   
     UPDATE kafka_share_pending_acks SET state = 'ACKED'
     recordsInFlight.clear()
     RENEW thread: cleared
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to