Shekharrajak commented on issue #18439:
URL: https://github.com/apache/druid/issues/18439#issuecomment-4188637372
To make sure there is no data loss :
```
Phase 1 — POLL
records = consumer.poll(pollTimeout)
Broker: acquisition lock set on each record (TTL = lock.duration.ms)
Local: recordsInFlight = new LinkedHashMap<>() //
(topic,partition,offset) -> record
RENEW thread: started (see Section 10)
Phase 2 — PARSE
for each record r:
try:
row = inputRowParser.parse(r)
pendingRows.add(row)
pendingAccept.add(r)
catch ParseException:
parseExceptionCount++
if parseExceptionCount > maxParseExceptions:
// Too many bad records — operator must fix schema. Fail the task.
// RELEASE all records: broker redelivers after lock expiry.
// (Do not REJECT on task failure — preserve redelivery for
non-parse records.)
releaseAllAndFail()
pendingReject.add(r) // REJECT: permanent schema error, no value in
retrying
Phase 3 — INGEST
appenderator.add(pendingRows) // write to JVM heap buffer
Phase 4 — PERSIST (local disk — crash-safe boundary 1)
if rowCount >= maxRowsInMemory OR elapsed >= intermediatePersistPeriod:
try:
appenderator.persist() // flush to local disk; survives JVM crash
catch IOException:
pendingRelease.addAll(pendingAccept)
pendingAccept.clear()
commitAcknowledgements() // RELEASE — broker redelivers; no data
loss
continue // do not fail task; retry on next cycle
Phase 5 — PUSH (deep storage — crash-safe boundary 2)
try:
pushedSegments = driver.push(appenderator.getSegments())
catch Exception:
pendingRelease.addAll(pendingAccept)
pendingAccept.clear()
commitAcknowledgements() // RELEASE — broker redelivers
if retryCount < maxPushRetries: retry else: fail task
Phase 6 — WRITE PENDING-ACK INTENT (WAL for two-phase ACK)
INSERT INTO kafka_share_pending_acks
(supervisor_id, task_id, topic, partition, offsets_blob,
segment_ids, state = 'SEGMENTS_PUSHED_PENDING_REGISTER')
Phase 7 — REGISTER (metadata store — point of no return)
try:
coordinator.announceHistoricalSegments(pushedSegments)
catch Exception:
pendingRelease.addAll(pendingAccept)
commitAcknowledgements() // RELEASE — segments in deep storage, not
queryable
// Orphaned segment cleanup: coordinator background thread removes
unregistered segs
return // retry registration or fail task
UPDATE kafka_share_pending_acks SET state =
'SEGMENTS_REGISTERED_PENDING_ACK'
Phase 8 — ACK (after registration — safe to tell broker "done")
for r in pendingAccept: consumer.acknowledge(r, AcknowledgeType.ACCEPT)
for r in pendingReject: consumer.acknowledge(r, AcknowledgeType.REJECT)
for r in pendingRelease: consumer.acknowledge(r, AcknowledgeType.RELEASE)
Map<TopicIdPartition, Optional<KafkaException>> results =
consumer.commitSync(Duration.ofMillis(ackTimeoutMs))
for each (partition, maybeException) in results:
if maybeException.isPresent():
// Segments for this partition ARE registered and queryable.
// Records will be redelivered. Duplicates possible; no data loss.
log.warn("ACK failed for {}, records will be redelivered", partition)
emitMetric("ingest/share/ackFailure", 1)
UPDATE kafka_share_pending_acks SET state = 'ACKED'
recordsInFlight.clear()
RENEW thread: cleared
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]