[ 
https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219689#comment-16219689
 ] 

Apurva Mehta commented on KAFKA-6119:
-------------------------------------

Thanks Gary. This indeed looks like a bug. I downloaded your program and ran 
it. Here are the kafka data logs for the two partitions:

{noformat}
amehta-macbook-pro:kafka-logs apurva$ 
~/workspace/confluent/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
--files producer-test-1/00000000000000000000.log  --deep-iteration
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/core/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/tools/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Dumping producer-test-1/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1508970707070 isvalid: true keysize: -1 
valuesize: 23 magic: 2 compresscodec: NONE producerId: 0 sequence: 0 
isTransactional: true headerKeys: []
offset: 1 position: 91 CreateTime: 1508970707085 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: NONE producerId: 0 sequence: -1 
isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
amehta-macbook-pro:kafka-logs apurva$ 
~/workspace/confluent/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
--files producer-test-0/00000000000000000000.log  --deep-iterati
on
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/core/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/tools/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Dumping producer-test-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1508970637066 isvalid: true keysize: -1 
valuesize: 23 magic: 2 compresscodec: NONE producerId: 0 sequence: 0 
isTransactional: true headerKeys: []
offset: 1 position: 91 CreateTime: 1508970666872 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: NONE producerId: 0 sequence: -1 
isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 0
amehta-macbook-pro:kafka-logs apurva$
{noformat}

And here is the trace level output from the kafka producer: 

{noformat}
5:30:36.041 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka 
version : 0.11.0.1
15:30:36.041 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka 
commitId : c2a0d5f9b1f45bf5
15:30:36.041 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - 
Kafka producer with client id producer-1 created
15:30:36.042 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Transition from state 
UNINITIALIZED to INITIALIZING
15:30:36.042 [main] INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] ProducerId set to -1 with epoch -1
15:30:36.048 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=InitProducerIdRequest, transactionalId=test-producer-1508970635810, 
transactionTimeoutMs=100)
15:30:36.051 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION)
15:30:36.051 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=InitProducerIdRequest, transactionalId=test-producer-1508970635810, 
transactionTimeoutMs=100)
15:30:36.154 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Initiating connection to node 
localhost:9092 (id: -1 rack: null)
15:30:36.211 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-sent
15:30:36.211 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-received
15:30:36.212 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
15:30:36.214 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 
326640, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
15:30:36.215 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Completed connection to node -1. 
Fetching API versions.
15:30:36.215 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from 
node -1.
15:30:36.286 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Recorded API versions for node -1: 
(Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 
2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 
[usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 
3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 
3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 
1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], 
LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], 
DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], 
SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], 
CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], 
DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], 
OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], 
AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], 
WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], 
DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 
0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 
0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0)
15:30:36.287 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
15:30:36.323 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION)
15:30:36.426 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
15:30:36.430 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION)
15:30:36.535 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
15:30:36.538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION)
15:30:36.642 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
15:30:36.648 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION)
15:30:36.751 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
15:30:36.755 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION)
15:30:36.859 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, 
coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
15:30:36.871 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Initiating connection to node 
10.200.6.85:9092 (id: 0 rack: null)
15:30:36.871 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-0.bytes-sent
15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-0.bytes-received
15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 
310308, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0
15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Completed connection to node 0. 
Fetching API versions.
15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from 
node 0.
15:30:36.873 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Recorded API versions for node 0: 
(Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 
2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 
[usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 
3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 
3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 
1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], 
LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], 
DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], 
SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], 
CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], 
DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], 
OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], 
AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], 
WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], 
DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 
0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 
0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0)
15:30:36.978 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=InitProducerIdRequest, transactionalId=test-producer-1508970635810, 
transactionTimeoutMs=100) to node 10.200.6.85:9092 (id: 0 rack: null)
15:30:37.052 [kafka-producer-network-thread | producer-1] INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] ProducerId set to 0 with epoch 0
15:30:37.052 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Transition from state 
INITIALIZING to READY
15:30:37.052 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Transition from state READY to 
IN_TRANSACTION
15:30:37.053 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=producer-test) to node localhost:9092 (id: -1 
rack: null)
15:30:37.061 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to 
Cluster(id = MWSsYEsXSIKPeqPAE1C9rg, nodes = [10.200.6.85:9092 (id: 0 rack: 
null)], partitions = [Partition(topic = producer-test, partition = 1, leader = 
0, replicas = [0], isr = [0]), Partition(topic = producer-test, partition = 0, 
leader = 0, replicas = [0], isr = [0])])
15:30:37.066 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Begin adding new partition 
producer-test-0 to transaction
15:30:37.071 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, 
producerId=0, producerEpoch=0, partitions=[producer-test-0])
15:30:37.072 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, 
producerId=0, producerEpoch=0, partitions=[producer-test-0]) to node 
10.200.6.85:9092 (id: 0 rack: null)
15:30:37.081 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Successfully added partitions 
[producer-test-0] to transaction
15:30:37.081 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning 
sequence number 0 from producer (producerId=0, epoch=0) to dequeued batch from 
partition producer-test-0 bound for 10.200.6.85:9092 (id: 0 rack: null).
15:30:37.083 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
topic.producer-test.records-per-batch
15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
topic.producer-test.bytes
15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
topic.producer-test.compression-rate
15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
topic.producer-test.record-retries
15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
topic.producer-test.record-errors
15:30:37.110 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - Incremented sequence 
number for topic-partition producer-test-0 to 1
15:31:47.070 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Begin adding new partition 
producer-test-1 to transaction
15:31:47.071 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Transition from state 
IN_TRANSACTION to COMMITTING_TRANSACTION
15:31:47.072 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, 
producerId=0, producerEpoch=0, partitions=[producer-test-1])
15:31:47.073 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Enqueuing transactional request 
(type=EndTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, 
producerEpoch=0, result=COMMIT)
15:31:47.073 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request 
(type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, 
producerId=0, producerEpoch=0, partitions=[producer-test-1]) to node 
10.200.6.85:9092 (id: 0 rack: null)
15:31:47.078 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Successfully added partitions 
[producer-test-1] to transaction
15:31:47.078 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning 
sequence number 0 from producer (producerId=0, epoch=0) to dequeued batch from 
partition producer-test-1 bound for 10.200.6.85:9092 (id: 0 rack: null).
15:31:47.081 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - Incremented sequence 
number for topic-partition producer-test-1 to 1
15:31:47.082 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - [TransactionalId 
test-producer-1508970635810] Sending transactional request (type=EndTxnRequest, 
transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, 
result=COMMIT) to node 10.200.6.85:9092 (id: 0 rack: null)
15:31:47.085 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - 
[TransactionalId test-producer-1508970635810] Transition from state 
COMMITTING_TRANSACTION to READY
15:31:47.085 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
15:31:47.085 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of 
Kafka producer I/O thread, sending remaining records.
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
connections-closed:
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
connections-created:
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
bytes-sent-received:
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
bytes-received:
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
node--1.bytes-sent
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
node--1.bytes-received
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
node--1.latency
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
node-0.bytes-sent
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
node-0.bytes-received
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics - Removed sensor with name 
node-0.latency
15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer 
I/O thread has completed.
15:31:47.089 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - 
Kafka producer with client id producer-1 has been closed
{noformat}

As we can see, the message to partition 0 is aborted, but the second one is 
erroneously committed. Fencing isn't working as it should here. This can be 
seen from the dump of the transaction log: 

{noformat}
amehta-macbook-pro:__transaction_state-35 apurva$ 
~/workspace/confluent/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
--transaction-log-decoder --files 00000000000000000000.log --deep-iteration
Dumping 00000000000000000000.log
Starting offset: 0
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/core/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/tools/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/apurva/workspace/confluent/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
offset: 0 position: 0 CreateTime: 1508970637004 isvalid: true keysize: 31 
valuesize: 37 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1508970636999,txnTimeoutMs=100
offset: 1 position: 137 CreateTime: 1508970637077 isvalid: true keysize: 31 
valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=Ongoing,partitions=Set(producer-test-0),txnLastUpdateTimestamp=1508970637076,txnTimeoutMs=100
offset: 2 position: 297 CreateTime: 1508970666849 isvalid: true keysize: 31 
valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=PrepareAbort,partitions=Set(producer-test-0),txnLastUpdateTimestamp=1508970666848,txnTimeoutMs=100
offset: 3 position: 457 CreateTime: 1508970666884 isvalid: true keysize: 31 
valuesize: 37 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=CompleteAbort,partitions=Set(),txnLastUpdateTimestamp=1508970666852,txnTimeoutMs=100
offset: 4 position: 594 CreateTime: 1508970707075 isvalid: true keysize: 31 
valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=Ongoing,partitions=Set(producer-test-1),txnLastUpdateTimestamp=1508970707074,txnTimeoutMs=100
offset: 5 position: 754 CreateTime: 1508970707083 isvalid: true keysize: 31 
valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=PrepareCommit,partitions=Set(producer-test-1),txnLastUpdateTimestamp=1508970707083,txnTimeoutMs=100
offset: 6 position: 914 CreateTime: 1508970707086 isvalid: true keysize: 31 
valuesize: 37 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 
isTransactional: false headerKeys: [] key: 
transactionalId=test-producer-1508970635810 payload: 
producerId:0,producerEpoch:0,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1508970707084,txnTimeoutMs=100
amehta-macbook-pro:__transaction_state-35 apurva$

{noformat}

The transaction state doesn't have an epoch bump. Hence the second send 
succeeded.

> Silent Data Loss in Kafka011 Transactional Producer
> ---------------------------------------------------
>
>                 Key: KAFKA-6119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6119
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.11.0.0, 0.11.0.1
>         Environment: openjdk version "1.8.0_144"
> OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01)
> OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode)
>            Reporter: Gary Y.
>            Priority: Blocker
>              Labels: reliability
>
> Kafka can lose data published by a transactional {{KafkaProducer}} under some 
> circumstances, i.e., data that should be committed atomically may not be 
> fully visible from a consumer with {{read_committed}} isolation level.
>  
> *Steps to reproduce:*
> # Set {{transaction.timeout.ms}} to a low value such as {{100}}
> # Publish two messages in one transaction to different partitions of a topic 
> with a sufficiently long time in-between the messages (e.g., 70 s).
> # Only the second message is visible with {{read_committed}} isolation level.
> See 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
>  for a full example. Detailed instructions can be found in the {{README.md}}: 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo
> *Why is this possible?*
> Because the transaction timeout is set to a low value, the transaction will 
> be rolled back quickly after the first message is sent. Indeed, in the broker 
> the following logs could be found:
> {code}
> [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized 
> transactionalId test-producer-1508964897483 with producerId 5 and producer 
> epoch 0 on partition __transaction_state-10 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed 
> rollback ongoing transaction of transactionalId: test-producer-1508964897483 
> due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
> {code}
> After rollback, the second message is sent to a different partition than the 
> first message. 
> Upon, transaction commit, 
> {{org.apache.kafka.clients.producer.internals.TransactionManager}} may 
> enqueue the request {{addPartitionsToTransactionHandler}}:
> {code}
> private TransactionalRequestResult 
> beginCompletingTransaction(TransactionResult transactionResult) {
>         if (!newPartitionsInTransaction.isEmpty())
>             enqueueRequest(addPartitionsToTransactionHandler());
>         EndTxnRequest.Builder builder = new 
> EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
>                 producerIdAndEpoch.epoch, transactionResult);
>         EndTxnHandler handler = new EndTxnHandler(builder);
>         enqueueRequest(handler);
>         return handler.result;
>     }
> {code}
> As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} 
> is non-empty. I suspect because the second message goes to a different 
> partition, this condition is satisfied.
> In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} may 
> eventually call {{TransactionMetadata#prepareAddPartitions}}:
> {code}
>  def prepareAddPartitions(addedTopicPartitions: 
> immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
>     val newTxnStartTimestamp = state match {
>       case Empty | CompleteAbort | CompleteCommit => updateTimestamp
>       case _ => txnStartTimestamp
>     }
>     prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, 
> (topicPartitions ++ addedTopicPartitions).toSet,
>       newTxnStartTimestamp, updateTimestamp)
>   }
> {code}
> Note that the method's first argument {{newState}} of is always *Ongoing* 
> here. I suspect that this puts the transaction, which should be aborted, to 
> _Ongoing_ again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to