[ 
https://issues.apache.org/jira/browse/KAFKA-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5355:
-----------------------------------
    Description: 
This issue is exposed by the new Streams EOS integration test.

Streams has two tasks (ie, two producers with {{pid}} 0 and 2000) both writing 
to output topic {{output}} with one partition (replication factor 1).

The test uses an transactional consumer with {{group.id=readCommitted}} to read 
the data from {{output}} topic. When it read the data, each producer has 
committed 10 records (one producer write messages with {{key=0}} and the other 
with {{key=1}}). Furthermore, each producer has an open transaction and 5 
uncommitted records written.

The test fails, as we expect to see 10 records per key, but we get 15 for key=1:
{noformat}
java.lang.AssertionError: 
Expected: <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), 
KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 
36), KeyValue(1, 45)]>
     but: was <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), 
KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 
36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), KeyValue(1, 78), 
KeyValue(1, 91), KeyValue(1, 105)]>
{noformat}

Dumping the segment shows, that there are two commit markers (one for each 
producer) for the first 10 messages written. Furthermore, there are 5 pending 
records. Thus, "latest stable offset" should be 21 (20 messages plus 2 commit 
markers) and not data should be returned beyond this offset.

Dumped Log Segment {{output-0}}
{noformat}
Starting offset: 0
baseOffset: 0 lastOffset: 9 baseSequence: 0 lastSequence: 9 producerId: 0 
producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 0 
CreateTime: 1496255947332 isvalid: true size: 291 magic: 2 compresscodec: NONE 
crc: 600535135
baseOffset: 10 lastOffset: 10 baseSequence: -1 lastSequence: -1 producerId: 0 
producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 291 
CreateTime: 1496256005429 isvalid: true size: 78 magic: 2 compresscodec: NONE 
crc: 3458060752
baseOffset: 11 lastOffset: 20 baseSequence: 0 lastSequence: 9 producerId: 2000 
producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 369 
CreateTime: 1496255947322 isvalid: true size: 291 magic: 2 compresscodec: NONE 
crc: 3392915713
baseOffset: 21 lastOffset: 25 baseSequence: 10 lastSequence: 14 producerId: 0 
producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 660 
CreateTime: 1496255947342 isvalid: true size: 176 magic: 2 compresscodec: NONE 
crc: 3513911368
baseOffset: 26 lastOffset: 26 baseSequence: -1 lastSequence: -1 producerId: 
2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 
836 CreateTime: 1496256011784 isvalid: true size: 78 magic: 2 compresscodec: 
NONE crc: 1619151485
{noformat}

Dump with {{--deep-iteration}}
{noformat}
Starting offset: 0
offset: 0 position: 0 CreateTime: 1496255947323 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 0 
headerKeys: [] key: 1 payload: 0
offset: 1 position: 0 CreateTime: 1496255947324 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 1 
headerKeys: [] key: 1 payload: 1
offset: 2 position: 0 CreateTime: 1496255947325 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 2 
headerKeys: [] key: 1 payload: 3
offset: 3 position: 0 CreateTime: 1496255947326 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 3 
headerKeys: [] key: 1 payload: 6
offset: 4 position: 0 CreateTime: 1496255947327 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 4 
headerKeys: [] key: 1 payload: 10
offset: 5 position: 0 CreateTime: 1496255947328 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 5 
headerKeys: [] key: 1 payload: 15
offset: 6 position: 0 CreateTime: 1496255947329 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 6 
headerKeys: [] key: 1 payload: 21
offset: 7 position: 0 CreateTime: 1496255947330 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 7 
headerKeys: [] key: 1 payload: 28
offset: 8 position: 0 CreateTime: 1496255947331 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 8 
headerKeys: [] key: 1 payload: 36
offset: 9 position: 0 CreateTime: 1496255947332 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 9 
headerKeys: [] key: 1 payload: 45
offset: 10 position: 291 CreateTime: 1496256005429 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: NONE crc: 3458060752 sequence: -1 
headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
offset: 11 position: 369 CreateTime: 1496255947313 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 0 
headerKeys: [] key: 0 payload: 0
offset: 12 position: 369 CreateTime: 1496255947314 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 1 
headerKeys: [] key: 0 payload: 1
offset: 13 position: 369 CreateTime: 1496255947315 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 2 
headerKeys: [] key: 0 payload: 3
offset: 14 position: 369 CreateTime: 1496255947316 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 3 
headerKeys: [] key: 0 payload: 6
offset: 15 position: 369 CreateTime: 1496255947317 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 4 
headerKeys: [] key: 0 payload: 10
offset: 16 position: 369 CreateTime: 1496255947318 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 5 
headerKeys: [] key: 0 payload: 15
offset: 17 position: 369 CreateTime: 1496255947319 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 6 
headerKeys: [] key: 0 payload: 21
offset: 18 position: 369 CreateTime: 1496255947320 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 7 
headerKeys: [] key: 0 payload: 28
offset: 19 position: 369 CreateTime: 1496255947321 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 8 
headerKeys: [] key: 0 payload: 36
offset: 20 position: 369 CreateTime: 1496255947322 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 9 
headerKeys: [] key: 0 payload: 45
offset: 21 position: 660 CreateTime: 1496255947338 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 10 
headerKeys: [] key: 1 payload: 55
offset: 22 position: 660 CreateTime: 1496255947339 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 11 
headerKeys: [] key: 1 payload: 66
offset: 23 position: 660 CreateTime: 1496255947340 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 12 
headerKeys: [] key: 1 payload: 78
offset: 24 position: 660 CreateTime: 1496255947341 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 13 
headerKeys: [] key: 1 payload: 91
offset: 25 position: 660 CreateTime: 1496255947342 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 14 
headerKeys: [] key: 1 payload: 105
offset: 26 position: 836 CreateTime: 1496256011784 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: NONE crc: 1619151485 sequence: -1 
headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
{noformat}

The client log shows, that the reading and writing happen concurrently.
{noformat}
[2017-05-31 11:40:11,642] DEBUG Resetting offset for partition outputTopic-0 to 
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:418)
[2017-05-31 11:40:11,642] DEBUG Added fetch request for partition outputTopic-0 
at offset 0 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,642] DEBUG Sending fetch for partitions [outputTopic-0] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,643] DEBUG Added fetch request for partition outputTopic-0 
at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,643] DEBUG Sending fetch for partitions [outputTopic-0] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,673] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (type=FindCoordinatorRequest, coordinatorKey=appId-1, 
coordinatorType=GROUP) (org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,674] DEBUG TransactionalId appId-1-0_1 -- Received 
FindCoordinator response with error NONE 
(org.apache.kafka.clients.producer.internals.TransactionManager:738)
[2017-05-31 11:40:11,674] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, consumerGroupId=appId-1, 
offsets={inputTopic-1=CommittedOffset(offset=10, metadata='')}) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64267 (id: 0 rack: null) : 
producerId: 0, epoch: 6, Assigning sequence for outputTopic-0: 10 
(org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
[2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64273 (id: 2 rack: null) : 
producerId: 0, epoch: 6, Assigning sequence for appId-1-store-changelog-0: 10 
(org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
[2017-05-31 11:40:11,718] DEBUG Incremented sequence number for topic-partition 
appId-1-store-changelog-0 to 15 
(org.apache.kafka.clients.producer.internals.Sender:555)
[2017-05-31 11:40:11,718] DEBUG Incremented sequence number for topic-partition 
outputTopic-0 to 15 (org.apache.kafka.clients.producer.internals.Sender:555)
[2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Received 
TxnOffsetCommit response with errors {inputTopic-1=NONE} 
(org.apache.kafka.clients.producer.internals.TransactionManager:900)
[2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Transition from 
state IN_TRANSACTION to COMMITTING_TRANSACTION 
(org.apache.kafka.clients.producer.internals.TransactionManager:427)
[2017-05-31 11:40:11,780] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, result=COMMIT) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Received EndTxn 
response with error NONE 
(org.apache.kafka.clients.producer.internals.TransactionManager:792)
[2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition from 
state COMMITTING_TRANSACTION to READY 
(org.apache.kafka.clients.producer.internals.TransactionManager:427)
[2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition from 
state READY to IN_TRANSACTION 
(org.apache.kafka.clients.producer.internals.TransactionManager:427)
[2017-05-31 11:40:11,782] DEBUG Initiating connection to node 0 at 
127.0.0.1:64267. (org.apache.kafka.clients.NetworkClient:707)
[2017-05-31 11:40:11,782] DEBUG Added fetch request for partition inputTopic-1 
at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,782] DEBUG Sending fetch for partitions [inputTopic-1] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,783] DEBUG Completed connection to node 0.  Ready. 
(org.apache.kafka.clients.NetworkClient:672)
[2017-05-31 11:40:11,783] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,784] DEBUG Added fetch request for partition inputTopic-1 
at offset 15 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,784] DEBUG Sending fetch for partitions [inputTopic-1] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,784] DEBUG TransactionalId appId-1-0_1 -- Received 
AddPartitionsToTxn response with errors {outputTopic-0=CONCURRENT_TRANSACTIONS, 
appId-1-store-changelog-1=CONCURRENT_TRANSACTIONS} 
(org.apache.kafka.clients.producer.internals.TransactionManager:658)
[2017-05-31 11:40:11,784] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,784] DEBUG Added fetch request for partition outputTopic-0 
at offset 27 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,785] DEBUG Sending fetch for partitions [outputTopic-0] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
{noformat}

(full log attached)

  was:
This issue is exposed by the new (not yet committed) Streams EOS integration 
test.

Streams has two tasks (ie, two producers with {{pid}} 0 and 2000) both writing 
to output topic {{output}} with one partition (replication factor 1).

The test uses an transactional consumer with {{group.id=readCommitted}} to read 
the data from {{output}} topic. When it read the data, each producer has 
committed 10 records (one producer write messages with {{key=0}} and the other 
with {{key=1}}). Furthermore, each producer has an open transaction and 5 
uncommitted records written.

The test fails, as we expect to see 10 records per key, but we get 15 for key=1:
{noformat}
java.lang.AssertionError: 
Expected: <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), 
KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 
36), KeyValue(1, 45)]>
     but: was <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), 
KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 
36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), KeyValue(1, 78), 
KeyValue(1, 91), KeyValue(1, 105)]>
{noformat}

Dumping the segment shows, that there are two commit markers (one for each 
producer) for the first 10 messages written. Furthermore, there are 5 pending 
records. Thus, "latest stable offset" should be 21 (20 messages plus 2 commit 
markers) and not data should be returned beyond this offset.

Dumped Log Segment {{output-0}}
{noformat}
Starting offset: 0
baseOffset: 0 lastOffset: 9 baseSequence: 0 lastSequence: 9 producerId: 0 
producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 0 
CreateTime: 1496255947332 isvalid: true size: 291 magic: 2 compresscodec: NONE 
crc: 600535135
baseOffset: 10 lastOffset: 10 baseSequence: -1 lastSequence: -1 producerId: 0 
producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 291 
CreateTime: 1496256005429 isvalid: true size: 78 magic: 2 compresscodec: NONE 
crc: 3458060752
baseOffset: 11 lastOffset: 20 baseSequence: 0 lastSequence: 9 producerId: 2000 
producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 369 
CreateTime: 1496255947322 isvalid: true size: 291 magic: 2 compresscodec: NONE 
crc: 3392915713
baseOffset: 21 lastOffset: 25 baseSequence: 10 lastSequence: 14 producerId: 0 
producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 660 
CreateTime: 1496255947342 isvalid: true size: 176 magic: 2 compresscodec: NONE 
crc: 3513911368
baseOffset: 26 lastOffset: 26 baseSequence: -1 lastSequence: -1 producerId: 
2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 
836 CreateTime: 1496256011784 isvalid: true size: 78 magic: 2 compresscodec: 
NONE crc: 1619151485
{noformat}

Dump with {{--deep-iteration}}
{noformat}
Starting offset: 0
offset: 0 position: 0 CreateTime: 1496255947323 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 0 
headerKeys: [] key: 1 payload: 0
offset: 1 position: 0 CreateTime: 1496255947324 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 1 
headerKeys: [] key: 1 payload: 1
offset: 2 position: 0 CreateTime: 1496255947325 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 2 
headerKeys: [] key: 1 payload: 3
offset: 3 position: 0 CreateTime: 1496255947326 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 3 
headerKeys: [] key: 1 payload: 6
offset: 4 position: 0 CreateTime: 1496255947327 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 4 
headerKeys: [] key: 1 payload: 10
offset: 5 position: 0 CreateTime: 1496255947328 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 5 
headerKeys: [] key: 1 payload: 15
offset: 6 position: 0 CreateTime: 1496255947329 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 6 
headerKeys: [] key: 1 payload: 21
offset: 7 position: 0 CreateTime: 1496255947330 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 7 
headerKeys: [] key: 1 payload: 28
offset: 8 position: 0 CreateTime: 1496255947331 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 8 
headerKeys: [] key: 1 payload: 36
offset: 9 position: 0 CreateTime: 1496255947332 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 9 
headerKeys: [] key: 1 payload: 45
offset: 10 position: 291 CreateTime: 1496256005429 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: NONE crc: 3458060752 sequence: -1 
headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
offset: 11 position: 369 CreateTime: 1496255947313 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 0 
headerKeys: [] key: 0 payload: 0
offset: 12 position: 369 CreateTime: 1496255947314 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 1 
headerKeys: [] key: 0 payload: 1
offset: 13 position: 369 CreateTime: 1496255947315 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 2 
headerKeys: [] key: 0 payload: 3
offset: 14 position: 369 CreateTime: 1496255947316 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 3 
headerKeys: [] key: 0 payload: 6
offset: 15 position: 369 CreateTime: 1496255947317 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 4 
headerKeys: [] key: 0 payload: 10
offset: 16 position: 369 CreateTime: 1496255947318 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 5 
headerKeys: [] key: 0 payload: 15
offset: 17 position: 369 CreateTime: 1496255947319 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 6 
headerKeys: [] key: 0 payload: 21
offset: 18 position: 369 CreateTime: 1496255947320 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 7 
headerKeys: [] key: 0 payload: 28
offset: 19 position: 369 CreateTime: 1496255947321 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 8 
headerKeys: [] key: 0 payload: 36
offset: 20 position: 369 CreateTime: 1496255947322 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 9 
headerKeys: [] key: 0 payload: 45
offset: 21 position: 660 CreateTime: 1496255947338 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 10 
headerKeys: [] key: 1 payload: 55
offset: 22 position: 660 CreateTime: 1496255947339 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 11 
headerKeys: [] key: 1 payload: 66
offset: 23 position: 660 CreateTime: 1496255947340 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 12 
headerKeys: [] key: 1 payload: 78
offset: 24 position: 660 CreateTime: 1496255947341 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 13 
headerKeys: [] key: 1 payload: 91
offset: 25 position: 660 CreateTime: 1496255947342 isvalid: true keysize: 8 
valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 14 
headerKeys: [] key: 1 payload: 105
offset: 26 position: 836 CreateTime: 1496256011784 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: NONE crc: 1619151485 sequence: -1 
headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
{noformat}

The client log shows, that the reading and writing happen concurrently.
{noformat}
[2017-05-31 11:40:11,642] DEBUG Resetting offset for partition outputTopic-0 to 
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:418)
[2017-05-31 11:40:11,642] DEBUG Added fetch request for partition outputTopic-0 
at offset 0 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,642] DEBUG Sending fetch for partitions [outputTopic-0] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,643] DEBUG Added fetch request for partition outputTopic-0 
at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,643] DEBUG Sending fetch for partitions [outputTopic-0] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,673] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (type=FindCoordinatorRequest, coordinatorKey=appId-1, 
coordinatorType=GROUP) (org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,674] DEBUG TransactionalId appId-1-0_1 -- Received 
FindCoordinator response with error NONE 
(org.apache.kafka.clients.producer.internals.TransactionManager:738)
[2017-05-31 11:40:11,674] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, consumerGroupId=appId-1, 
offsets={inputTopic-1=CommittedOffset(offset=10, metadata='')}) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64267 (id: 0 rack: null) : 
producerId: 0, epoch: 6, Assigning sequence for outputTopic-0: 10 
(org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
[2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64273 (id: 2 rack: null) : 
producerId: 0, epoch: 6, Assigning sequence for appId-1-store-changelog-0: 10 
(org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
[2017-05-31 11:40:11,718] DEBUG Incremented sequence number for topic-partition 
appId-1-store-changelog-0 to 15 
(org.apache.kafka.clients.producer.internals.Sender:555)
[2017-05-31 11:40:11,718] DEBUG Incremented sequence number for topic-partition 
outputTopic-0 to 15 (org.apache.kafka.clients.producer.internals.Sender:555)
[2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Received 
TxnOffsetCommit response with errors {inputTopic-1=NONE} 
(org.apache.kafka.clients.producer.internals.TransactionManager:900)
[2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Transition from 
state IN_TRANSACTION to COMMITTING_TRANSACTION 
(org.apache.kafka.clients.producer.internals.TransactionManager:427)
[2017-05-31 11:40:11,780] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, result=COMMIT) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Received EndTxn 
response with error NONE 
(org.apache.kafka.clients.producer.internals.TransactionManager:792)
[2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition from 
state COMMITTING_TRANSACTION to READY 
(org.apache.kafka.clients.producer.internals.TransactionManager:427)
[2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition from 
state READY to IN_TRANSACTION 
(org.apache.kafka.clients.producer.internals.TransactionManager:427)
[2017-05-31 11:40:11,782] DEBUG Initiating connection to node 0 at 
127.0.0.1:64267. (org.apache.kafka.clients.NetworkClient:707)
[2017-05-31 11:40:11,782] DEBUG Added fetch request for partition inputTopic-1 
at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,782] DEBUG Sending fetch for partitions [inputTopic-1] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,783] DEBUG Completed connection to node 0.  Ready. 
(org.apache.kafka.clients.NetworkClient:672)
[2017-05-31 11:40:11,783] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,784] DEBUG Added fetch request for partition inputTopic-1 
at offset 15 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,784] DEBUG Sending fetch for partitions [inputTopic-1] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
[2017-05-31 11:40:11,784] DEBUG TransactionalId appId-1-0_1 -- Received 
AddPartitionsToTxn response with errors {outputTopic-0=CONCURRENT_TRANSACTIONS, 
appId-1-store-changelog-1=CONCURRENT_TRANSACTIONS} 
(org.apache.kafka.clients.producer.internals.TransactionManager:658)
[2017-05-31 11:40:11,784] DEBUG TransactionalId: appId-1-0_1 -- Sending 
transactional request (transactionalId=appId-1-0_1, producerId=2000, 
producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) 
(org.apache.kafka.clients.producer.internals.Sender:314)
[2017-05-31 11:40:11,784] DEBUG Added fetch request for partition outputTopic-0 
at offset 27 to node 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:793)
[2017-05-31 11:40:11,785] DEBUG Sending fetch for partitions [outputTopic-0] to 
broker 127.0.0.1:64267 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher:203)
{noformat}

(full log attached)


> Broker returns messages beyond "latest stable offset" to transactional 
> consumer in read_committed mode
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5355
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5355
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.11.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Jason Gustafson
>            Priority: Blocker
>              Labels: exactly-once
>             Fix For: 0.11.0.0
>
>         Attachments: test.log
>
>
> This issue is exposed by the new Streams EOS integration test.
> Streams has two tasks (ie, two producers with {{pid}} 0 and 2000) both 
> writing to output topic {{output}} with one partition (replication factor 1).
> The test uses an transactional consumer with {{group.id=readCommitted}} to 
> read the data from {{output}} topic. When it read the data, each producer has 
> committed 10 records (one producer write messages with {{key=0}} and the 
> other with {{key=1}}). Furthermore, each producer has an open transaction and 
> 5 uncommitted records written.
> The test fails, as we expect to see 10 records per key, but we get 15 for 
> key=1:
> {noformat}
> java.lang.AssertionError: 
> Expected: <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), 
> KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), 
> KeyValue(1, 36), KeyValue(1, 45)]>
>      but: was <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 
> 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), 
> KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), 
> KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105)]>
> {noformat}
> Dumping the segment shows, that there are two commit markers (one for each 
> producer) for the first 10 messages written. Furthermore, there are 5 pending 
> records. Thus, "latest stable offset" should be 21 (20 messages plus 2 commit 
> markers) and not data should be returned beyond this offset.
> Dumped Log Segment {{output-0}}
> {noformat}
> Starting offset: 0
> baseOffset: 0 lastOffset: 9 baseSequence: 0 lastSequence: 9 producerId: 0 
> producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 0 
> CreateTime: 1496255947332 isvalid: true size: 291 magic: 2 compresscodec: 
> NONE crc: 600535135
> baseOffset: 10 lastOffset: 10 baseSequence: -1 lastSequence: -1 producerId: 0 
> producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 291 
> CreateTime: 1496256005429 isvalid: true size: 78 magic: 2 compresscodec: NONE 
> crc: 3458060752
> baseOffset: 11 lastOffset: 20 baseSequence: 0 lastSequence: 9 producerId: 
> 2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 
> 369 CreateTime: 1496255947322 isvalid: true size: 291 magic: 2 compresscodec: 
> NONE crc: 3392915713
> baseOffset: 21 lastOffset: 25 baseSequence: 10 lastSequence: 14 producerId: 0 
> producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 660 
> CreateTime: 1496255947342 isvalid: true size: 176 magic: 2 compresscodec: 
> NONE crc: 3513911368
> baseOffset: 26 lastOffset: 26 baseSequence: -1 lastSequence: -1 producerId: 
> 2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 
> 836 CreateTime: 1496256011784 isvalid: true size: 78 magic: 2 compresscodec: 
> NONE crc: 1619151485
> {noformat}
> Dump with {{--deep-iteration}}
> {noformat}
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1496255947323 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 0 
> headerKeys: [] key: 1 payload: 0
> offset: 1 position: 0 CreateTime: 1496255947324 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 1 
> headerKeys: [] key: 1 payload: 1
> offset: 2 position: 0 CreateTime: 1496255947325 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 2 
> headerKeys: [] key: 1 payload: 3
> offset: 3 position: 0 CreateTime: 1496255947326 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 3 
> headerKeys: [] key: 1 payload: 6
> offset: 4 position: 0 CreateTime: 1496255947327 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 4 
> headerKeys: [] key: 1 payload: 10
> offset: 5 position: 0 CreateTime: 1496255947328 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 5 
> headerKeys: [] key: 1 payload: 15
> offset: 6 position: 0 CreateTime: 1496255947329 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 6 
> headerKeys: [] key: 1 payload: 21
> offset: 7 position: 0 CreateTime: 1496255947330 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 7 
> headerKeys: [] key: 1 payload: 28
> offset: 8 position: 0 CreateTime: 1496255947331 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 8 
> headerKeys: [] key: 1 payload: 36
> offset: 9 position: 0 CreateTime: 1496255947332 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 9 
> headerKeys: [] key: 1 payload: 45
> offset: 10 position: 291 CreateTime: 1496256005429 isvalid: true keysize: 4 
> valuesize: 6 magic: 2 compresscodec: NONE crc: 3458060752 sequence: -1 
> headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
> offset: 11 position: 369 CreateTime: 1496255947313 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 0 
> headerKeys: [] key: 0 payload: 0
> offset: 12 position: 369 CreateTime: 1496255947314 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 1 
> headerKeys: [] key: 0 payload: 1
> offset: 13 position: 369 CreateTime: 1496255947315 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 2 
> headerKeys: [] key: 0 payload: 3
> offset: 14 position: 369 CreateTime: 1496255947316 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 3 
> headerKeys: [] key: 0 payload: 6
> offset: 15 position: 369 CreateTime: 1496255947317 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 4 
> headerKeys: [] key: 0 payload: 10
> offset: 16 position: 369 CreateTime: 1496255947318 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 5 
> headerKeys: [] key: 0 payload: 15
> offset: 17 position: 369 CreateTime: 1496255947319 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 6 
> headerKeys: [] key: 0 payload: 21
> offset: 18 position: 369 CreateTime: 1496255947320 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 7 
> headerKeys: [] key: 0 payload: 28
> offset: 19 position: 369 CreateTime: 1496255947321 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 8 
> headerKeys: [] key: 0 payload: 36
> offset: 20 position: 369 CreateTime: 1496255947322 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 9 
> headerKeys: [] key: 0 payload: 45
> offset: 21 position: 660 CreateTime: 1496255947338 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 10 
> headerKeys: [] key: 1 payload: 55
> offset: 22 position: 660 CreateTime: 1496255947339 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 11 
> headerKeys: [] key: 1 payload: 66
> offset: 23 position: 660 CreateTime: 1496255947340 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 12 
> headerKeys: [] key: 1 payload: 78
> offset: 24 position: 660 CreateTime: 1496255947341 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 13 
> headerKeys: [] key: 1 payload: 91
> offset: 25 position: 660 CreateTime: 1496255947342 isvalid: true keysize: 8 
> valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 14 
> headerKeys: [] key: 1 payload: 105
> offset: 26 position: 836 CreateTime: 1496256011784 isvalid: true keysize: 4 
> valuesize: 6 magic: 2 compresscodec: NONE crc: 1619151485 sequence: -1 
> headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
> {noformat}
> The client log shows, that the reading and writing happen concurrently.
> {noformat}
> [2017-05-31 11:40:11,642] DEBUG Resetting offset for partition outputTopic-0 
> to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:418)
> [2017-05-31 11:40:11,642] DEBUG Added fetch request for partition 
> outputTopic-0 at offset 0 to node 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:793)
> [2017-05-31 11:40:11,642] DEBUG Sending fetch for partitions [outputTopic-0] 
> to broker 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:203)
> [2017-05-31 11:40:11,643] DEBUG Added fetch request for partition 
> outputTopic-0 at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:793)
> [2017-05-31 11:40:11,643] DEBUG Sending fetch for partitions [outputTopic-0] 
> to broker 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:203)
> [2017-05-31 11:40:11,673] DEBUG TransactionalId: appId-1-0_1 -- Sending 
> transactional request (type=FindCoordinatorRequest, coordinatorKey=appId-1, 
> coordinatorType=GROUP) 
> (org.apache.kafka.clients.producer.internals.Sender:314)
> [2017-05-31 11:40:11,674] DEBUG TransactionalId appId-1-0_1 -- Received 
> FindCoordinator response with error NONE 
> (org.apache.kafka.clients.producer.internals.TransactionManager:738)
> [2017-05-31 11:40:11,674] DEBUG TransactionalId: appId-1-0_1 -- Sending 
> transactional request (transactionalId=appId-1-0_1, producerId=2000, 
> producerEpoch=2, consumerGroupId=appId-1, 
> offsets={inputTopic-1=CommittedOffset(offset=10, metadata='')}) 
> (org.apache.kafka.clients.producer.internals.Sender:314)
> [2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64267 (id: 0 rack: null) : 
> producerId: 0, epoch: 6, Assigning sequence for outputTopic-0: 10 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
> [2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64273 (id: 2 rack: null) : 
> producerId: 0, epoch: 6, Assigning sequence for appId-1-store-changelog-0: 10 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
> [2017-05-31 11:40:11,718] DEBUG Incremented sequence number for 
> topic-partition appId-1-store-changelog-0 to 15 
> (org.apache.kafka.clients.producer.internals.Sender:555)
> [2017-05-31 11:40:11,718] DEBUG Incremented sequence number for 
> topic-partition outputTopic-0 to 15 
> (org.apache.kafka.clients.producer.internals.Sender:555)
> [2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Received 
> TxnOffsetCommit response with errors {inputTopic-1=NONE} 
> (org.apache.kafka.clients.producer.internals.TransactionManager:900)
> [2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Transition 
> from state IN_TRANSACTION to COMMITTING_TRANSACTION 
> (org.apache.kafka.clients.producer.internals.TransactionManager:427)
> [2017-05-31 11:40:11,780] DEBUG TransactionalId: appId-1-0_1 -- Sending 
> transactional request (transactionalId=appId-1-0_1, producerId=2000, 
> producerEpoch=2, result=COMMIT) 
> (org.apache.kafka.clients.producer.internals.Sender:314)
> [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Received 
> EndTxn response with error NONE 
> (org.apache.kafka.clients.producer.internals.TransactionManager:792)
> [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition 
> from state COMMITTING_TRANSACTION to READY 
> (org.apache.kafka.clients.producer.internals.TransactionManager:427)
> [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition 
> from state READY to IN_TRANSACTION 
> (org.apache.kafka.clients.producer.internals.TransactionManager:427)
> [2017-05-31 11:40:11,782] DEBUG Initiating connection to node 0 at 
> 127.0.0.1:64267. (org.apache.kafka.clients.NetworkClient:707)
> [2017-05-31 11:40:11,782] DEBUG Added fetch request for partition 
> inputTopic-1 at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:793)
> [2017-05-31 11:40:11,782] DEBUG Sending fetch for partitions [inputTopic-1] 
> to broker 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:203)
> [2017-05-31 11:40:11,783] DEBUG Completed connection to node 0.  Ready. 
> (org.apache.kafka.clients.NetworkClient:672)
> [2017-05-31 11:40:11,783] DEBUG TransactionalId: appId-1-0_1 -- Sending 
> transactional request (transactionalId=appId-1-0_1, producerId=2000, 
> producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) 
> (org.apache.kafka.clients.producer.internals.Sender:314)
> [2017-05-31 11:40:11,784] DEBUG Added fetch request for partition 
> inputTopic-1 at offset 15 to node 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:793)
> [2017-05-31 11:40:11,784] DEBUG Sending fetch for partitions [inputTopic-1] 
> to broker 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:203)
> [2017-05-31 11:40:11,784] DEBUG TransactionalId appId-1-0_1 -- Received 
> AddPartitionsToTxn response with errors 
> {outputTopic-0=CONCURRENT_TRANSACTIONS, 
> appId-1-store-changelog-1=CONCURRENT_TRANSACTIONS} 
> (org.apache.kafka.clients.producer.internals.TransactionManager:658)
> [2017-05-31 11:40:11,784] DEBUG TransactionalId: appId-1-0_1 -- Sending 
> transactional request (transactionalId=appId-1-0_1, producerId=2000, 
> producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) 
> (org.apache.kafka.clients.producer.internals.Sender:314)
> [2017-05-31 11:40:11,784] DEBUG Added fetch request for partition 
> outputTopic-0 at offset 27 to node 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:793)
> [2017-05-31 11:40:11,785] DEBUG Sending fetch for partitions [outputTopic-0] 
> to broker 127.0.0.1:64267 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher:203)
> {noformat}
> (full log attached)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to