It was meaningless, just the copy of the first record.
in the real world, the key is bigint type and the primary key of a record.
I simplify the real program to make the code smaller.
if the transactional message has the problem.
the kafka streaming is based on the transactional message,
there are nobody face the problem as I meeting?
or I config some wrong ?

At 2017-12-20 23:43:07, "Ted Yu" <yuzhih...@gmail.com> wrote:
>bq.             record = new ProducerRecord<>("test", 0, (long)0,
>Long.toString(0));
>
>What was the rationale of passing 0 as the third parameter in the second
>transaction ?
>
>Cheers
>
>On Wed, Dec 20, 2017 at 5:08 AM, HKT <dushukuang...@163.com> wrote:
>
>> Hi, I have runned the program over 10 hours! It doesn't stop.
>> It generate 800+MB log, but almost of them is
>> 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG
>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>> transactionalId=hello] Sending transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>> producerEpoch=0, partitions=[test-0]) to node 192.168.245.1:9092 (id: 0
>> rack: null)
>> 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG
>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>> transactionalId=hello] Enqueuing transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>> producerEpoch=0, partitions=[test-0])
>>
>> I had debuged into the client library and I found the code in
>> Sender.run(long now) at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214):
>>
>> > 214 } else if (transactionManager.hasInFlightTransactionalRequest() ||
>> maybeSendTransactionalRequest(now)) {
>> > 215     // as long as there are outstanding transactional requests, we
>> simply wait for them to return
>> > 216     client.poll(retryBackoffMs, now);
>> > 217     return;
>> > 218 }
>>
>> and I found that Sender.run(long now) will call
>> Sender.maybeSendTransactionalRequest() that will call
>> TransactionManager.nextRequestHandler().
>> the TransactionManager has a pendingRequests.
>> In nextRequestHandler will poll a 
>> TransactionManager$AddPartitionsToTxnHandler
>> or a TransactionManager$FindCoordinatorHandler.
>> the behavior result the maybeSendTransactionalRequest returns true, here
>> is the stack:
>>   at org.apache.kafka.clients.producer.internals.Sender.maybeSend
>> TransactionalRequest(Sender.java:360)
>>   at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:214)
>>   at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:163)
>>   at java.lang.Thread.run(Thread.java:745)
>>
>>
>> so, It will call client.poll, and the client.poll will call
>> NetworkClient.completeResponses which will call enqueueRequest finally:
>>   at org.apache.kafka.clients.producer.internals.TransactionManag
>> er.enqueueRequest(TransactionManager.java:799)
>>   at org.apache.kafka.clients.producer.internals.TransactionManag
>> er.access$700(TransactionManager.java:64)
>>   at org.apache.kafka.clients.producer.internals.TransactionManag
>> er$TxnRequestHandler.reenqueue(TransactionManager.java:880)
>>   - locked <0x7d1> (a org.apache.kafka.clients.produ
>> cer.internals.TransactionManager)
>>   at org.apache.kafka.clients.producer.internals.TransactionManag
>> er$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024)
>>   at org.apache.kafka.clients.producer.internals.TransactionManag
>> er$TxnRequestHandler.onComplete(TransactionManager.java:905)
>>   at org.apache.kafka.clients.ClientResponse.onComplete(ClientRes
>> ponse.java:101)
>>   at org.apache.kafka.clients.NetworkClient.completeResponses(Net
>> workClient.java:482)
>>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
>>   at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:216)
>> enqueueRequest will add other a TransactionManager$AddPartitionsToTxnHandler
>> or a TransactionManager$FindCoordinatorHandler into the pendingRequests.
>> and cause the infinite loop.
>>
>> I think the problem is in the
>>   at org.apache.kafka.clients.producer.internals.TransactionManag
>> er$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024)
>> the code is
>> > 1022 } else if (error == Errors.CONCURRENT_TRANSACTIONS) {
>> > 1023     maybeOverrideRetryBackoffMs();
>> > 1024     reenqueue();
>> > 1025     return;
>> > 1026 }
>>
>> the error is Errors.CONCURRENT_TRANSACTIONS, but I don't have tow
>> transactions.
>>
>> I think there are somewhere I didn't config right. and here is my complete
>> code:
>>
>> import org.apache.kafka.clients.producer.KafkaProducer;
>> import org.apache.kafka.clients.producer.Producer;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>> import org.apache.kafka.clients.producer.internals.RecordAccumulator;
>> import org.apache.kafka.common.serialization.LongSerializer;
>> import org.apache.kafka.common.serialization.StringSerializer;
>> import org.apache.kafka.streams.kstream.KStream;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.util.HashMap;
>> import java.util.Properties;
>> import java.util.UUID;
>>
>> public class ApacheKafkaHello
>> {
>>     public static final String Home;
>>
>>     static
>>     {
>>         final String homeEnv = "APACHE_KAFKA_HELLO_HOME";
>>         String home = System.getenv(homeEnv);
>>         if (home == null) {
>>             System.err.printf("HOME ENVIRONMENT IS UNDEFINED: name=%s\n",
>> homeEnv);
>>             System.exit(-1);
>>         }
>>         Home = home;
>>     }
>>
>>     private static final Logger logger = LoggerFactory.getLogger(Apache
>> KafkaHello.class);
>>
>>     public static void main(final String[] args)
>>     {
>>         logger.info("apache kafka hello starts: Home={}", Home);
>>         for (int i = 0;i != args.length; ++i) {
>>             logger.info("    args[{}]={}", i, args[i]);
>>         }
>>
>>         Properties kafkaProps = new Properties();
>>         kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>>         kafkaProps.setProperty("key.serializer",
>> LongSerializer.class.getName());
>>         kafkaProps.setProperty("value.serializer",
>> StringSerializer.class.getName());
>>         kafkaProps.setProperty("transactional.id", "hello");
>>         try (KafkaProducer<Long, String> producer = new
>> KafkaProducer<>(kafkaProps)) {
>>             producer.initTransactions();
>>             producer.beginTransaction();
>>             ProducerRecord<Long, String> record = new
>> ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
>>             producer.send(record);
>>             producer.commitTransaction();
>>             producer.beginTransaction();
>>             record = new ProducerRecord<>("test", 0, (long)0,
>> Long.toString(0));
>>             producer.send(record);
>>             producer.commitTransaction();
>>         }
>>     }
>> }
>> 在 2017/12/19 20:54, HKT 写道:
>>
>>> Here is the server side DEBUG log.
>>>
>>> JDK Version is
>>> > java -version
>>> java version "1.8.0_121"
>>> Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
>>> Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
>>>
>>> Thanks
>>>
>>> 在 2017/12/19 11:13, Ted Yu 写道:
>>>
>>>> For the server log, is it possible to enable DEBUG logging ?
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Dec 18, 2017 at 4:35 PM, HKT <dushukuang...@163.com> wrote:
>>>>
>>>> Thanks for reply.
>>>>>
>>>>> here is the client side log:
>>>>>
>>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>>> [Producer clientId=producer-1, transactionalId=hello] Transition from
>>>>> state
>>>>> READY to IN_TRANSACTION
>>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>>> [Producer clientId=producer-1, transactionalId=hello] Begin adding new
>>>>> partition test-0 to transaction
>>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>>> [Producer clientId=producer-1, transactionalId=hello] Transition from
>>>>> state
>>>>> IN_TRANSACTION to COMMITTING_TRANSACTION
>>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>>> [Producer clientId=producer-1, transactionalId=hello] Enqueuing
>>>>> transactional request (type=AddPartitionsToTxnRequest,
>>>>> transactionalId=hello, producerId=0, producerEpoch=0,
>>>>> partitions=[test-0])
>>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>>> [Producer clientId=producer-1, transactionalId=hello] Enqueuing
>>>>> transactional request (type=EndTxnRequest, transactionalId=hello,
>>>>> producerId=0, producerEpoch=0, result=COMMIT)
>>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Sending transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>>> null)
>>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Enqueuing transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0])
>>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Sending transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>>> null)
>>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Enqueuing transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0])
>>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Sending transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>>> null)
>>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Enqueuing transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0])
>>>>> ... // duplicate messages
>>>>> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Enqueuing transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0])
>>>>> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
>>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>>> transactionalId=hello] Sending transactional request
>>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>>> null)
>>>>>
>>>>> and the server.log:
>>>>> [2017-12-19 08:26:08,408] INFO Completed load of log
>>>>> __transaction_state-9
>>>>> with 1 log segments, log start offset 0 and log end offset 0 in 15 ms
>>>>> (kafka.log.Log)
>>>>> [2017-12-19 08:26:08,408] INFO Created log for partition
>>>>> [__transaction_state,9] in D:\tmp\kafka-logs with properties
>>>>> {compression.type -> uncompressed, message.format.version -> 1.0-IV0,
>>>>> file.delete.delay.ms -> 60000, max.message.bytes -> 1000012,
>>>>> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
>>>>> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
>>>>> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
>>>>> unclean.leader.election.enable -> false, retention.bytes -> -1,
>>>>> delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms ->
>>>>> 9223372036854775807, segment.ms -> 604800000, segment.bytes ->
>>>>> 104857600,
>>>>> retention.ms -> 604800000, message.timestamp.difference.max.ms ->
>>>>> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
>>>>> 9223372036854775807}. (kafka.log.LogManager)
>>>>> [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9
>>>>> broker=0]
>>>>> No checkpointed highwatermark is found for partition
>>>>> __transaction_state-9
>>>>> (kafka.cluster.Partition)
>>>>> [2017-12-19 08:26:08,408] INFO Replica loaded for partition
>>>>> __transaction_state-9 with initial high watermark 0
>>>>> (kafka.cluster.Replica)
>>>>> [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9
>>>>> broker=0]
>>>>> __transaction_state-9 starts at Leader Epoch 0 from offset 0. Previous
>>>>> Leader Epoch was: -1 (kafka.cluster.Partition)
>>>>> [2017-12-19 08:26:08,408] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-1
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-4
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-7
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-10
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-13
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-16
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-19
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-22
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-25
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-28
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-31
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-34
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-37
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-2
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-5
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-8
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-11
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-14
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-17
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-20
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-23
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-26
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-29
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-32
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-35
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-38
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-41
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-44
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-47
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-21
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-24
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-27
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-30
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-33
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-36
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-39
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-42
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-45
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-48
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-40
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-43
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-46
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-49
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-0
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-3
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-6
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-9
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-12
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-15
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading
>>>>> transaction metadata from __transaction_state-18
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2017-12-19 08:26:08,627] INFO Updated PartitionLeaderEpoch. New:
>>>>> {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition:
>>>>> __transaction_state-22. Cache now contains 0 entries.
>>>>> (kafka.server.epoch.LeaderEpochFileCache)
>>>>> [2017-12-19 08:26:08,658] INFO [TransactionCoordinator id=0] Initialized
>>>>> transactionalId hello with producerId 0 and producer epoch 0 on
>>>>> partition
>>>>> __transaction_state-22 (kafka.coordinator.transaction
>>>>> .TransactionCoordinator)
>>>>> [2017-12-19 08:26:08,705] INFO Updated PartitionLeaderEpoch. New:
>>>>> {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition:
>>>>> test-0.
>>>>> Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
>>>>>
>>>>>
>>>>> 在 2017/12/19 0:42, Ted Yu 写道:
>>>>>
>>>>> Can you capture stack trace on the broker and pastebin it ?
>>>>>>
>>>>>> Broker log may also provide some clue.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 4:46 AM, HKT <dushukuang...@163.com> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>>> I was testing the transactional message on kafka.
>>>>>>> but I get a problem.
>>>>>>> the producer always blocking at second commitTransaction.
>>>>>>> Here is my code:
>>>>>>>
>>>>>>>           Properties kafkaProps = new Properties();
>>>>>>>           kafkaProps.setProperty("bootstrap.servers",
>>>>>>> "localhost:9092");
>>>>>>>           kafkaProps.setProperty("key.serializer",
>>>>>>> LongSerializer.class.getName());
>>>>>>>           kafkaProps.setProperty("value.serializer",
>>>>>>> StringSerializer.class.getName());
>>>>>>>           kafkaProps.setProperty("transactional.id", "hello");
>>>>>>>           try (KafkaProducer<Long, String> producer = new
>>>>>>> KafkaProducer<>(kafkaProps)) {
>>>>>>>               producer.initTransactions();
>>>>>>>               producer.beginTransaction();
>>>>>>>               ProducerRecord<Long, String> record = new
>>>>>>> ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
>>>>>>>               producer.send(record);
>>>>>>>               producer.sendOffsetsToTransaction(new HashMap<>(), "");
>>>>>>>               producer.commitTransaction();
>>>>>>>               producer.beginTransaction();
>>>>>>>               record = new ProducerRecord<>("test", 0, (long)0,
>>>>>>> Long.toString(0));
>>>>>>>               producer.send(record);
>>>>>>>               producer.commitTransaction(); // blocking here
>>>>>>>           }
>>>>>>>
>>>>>>> Enviroment:
>>>>>>> Kafka broker: 1.0.0
>>>>>>> broker count: 1
>>>>>>> Kafka Client: 1.0.0
>>>>>>> and I use the default server.properties in config/
>>>>>>>
>>>>>>> broker.id=0
>>>>>>> num.network.threads=3
>>>>>>> num.io.threads=8
>>>>>>> socket.send.buffer.bytes=102400
>>>>>>> socket.receive.buffer.bytes=102400
>>>>>>> socket.request.max.bytes=104857600
>>>>>>> log.dirs=/tmp/kafka-logs
>>>>>>> num.partitions=1
>>>>>>> num.recovery.threads.per.data.dir=1
>>>>>>> offsets.topic.replication.factor=1
>>>>>>> transaction.state.log.replication.factor=1
>>>>>>> transaction.state.log.min.isr=1
>>>>>>> log.retention.hours=168
>>>>>>> log.segment.bytes=1073741824
>>>>>>> log.retention.check.interval.ms=300000
>>>>>>> zookeeper.connect=localhost:2181
>>>>>>> zookeeper.connection.timeout.ms=6000
>>>>>>> group.initial.rebalance.delay.ms=0
>>>>>>>
>>>>>>> I have run the program in Windows 7 and CentOS 6.9.
>>>>>>> but it blocking in the second commitTransaction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>
>>
>>

Reply via email to