Re: [DISCUSS] KIP-646 Serializer API should support ByteBuffer

2021-01-15 Thread Corentin Chary



On 2020/07/23 03:23:58, Chia-Ping Tsai  wrote: 
> Thanks for quick feedback! Ismael 
> 
> > Are there options with lower impact that still help us achieve the goal for 
> > those who need it?
> > For example, it could be an opt-in thing instead of forcing the world to 
> > change.
> 
> It seems to me there are two alternatives.
> 
> 1. Introduce an new extended serializer (there is an existent but deprecated 
> one). if the serializer is extended type, we call new method to get 
> ByteBuffer. Users who have no interest of ByteBuffer keep using standard 
> Serializer interface.
> 2. Don’t deprecate the existent serialize methods. users are not under the 
> pressure of API migration

I guess the Deserializer could have both `byte[]` and `ByteBuffer` 
deserialization method.

The code would call the `ByteBuffer` one, its default implementation would be 
to convert to `byte[]` and call the `byte[]` one with Utils.toArray, as is done 
in the Fetcher today.

This should stay compatible with existing deserializer while making it possible 
to deal with ByteBuffer or `byte[]`.



[jira] [Created] (KAFKA-7679) With acks=all a single "stuck" non-leader replica can cause a timeout

2018-11-27 Thread Corentin Chary (JIRA)
Corentin Chary created KAFKA-7679:
-

 Summary: With acks=all a single "stuck" non-leader replica can 
cause a timeout
 Key: KAFKA-7679
 URL: https://issues.apache.org/jira/browse/KAFKA-7679
 Project: Kafka
  Issue Type: Bug
Reporter: Corentin Chary


>From the documentation:
{code:java}
acks=all

This means the leader will wait for the full set of in-sync replicas to 
acknowledge the record. This guarantees that the record will not be lost as 
long as at least one in-sync replica remains alive. This is the strongest 
available guarantee. This is equivalent to the acks=-1 setting.{code}
{code:java}
min.insync.replicas

When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the 
minimum number of replicas that must acknowledge a write for the write to be 
considered successful. If this minimum cannot be met, then the producer will 
raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater 
durability guarantees. A typical scenario would be to create a topic with a 
replication factor of 3, set min.insync.replicas to 2, and produce with acks of 
"all". This will ensure that the producer raises an exception if a majority of 
replicas do not receive a write.   int{code}
Given a replication factor of 3 and min.inseyc.repliacs set to 2, I would 
expect that the client get an acknowledgment as soon as it writes to the leader 
and at least one replica. This is what happens when on a 3 node cluster one of 
the broker is down for example.

Howether, it looks like this is not the case when a broker is "stuck" (which 
happens when you have network "blips").

Here is how I reproduced this, but you can probably do the same with iptables 
on your own cluster:
{code:java}
# Start a cluster with 3 nodes
$ docker-compose up -d
$ docker-compose scale kafka=3
Starting kafka-docker_kafka_1_dbf4109a3095 ... done
Creating kafka-docker_kafka_2_973a373fa5b5 ... done
Creating kafka-docker_kafka_3_3d8fab2ac44a ... done

# Create topics with various settings
$ docker-compose exec kafka bash
$ kafka-topics.sh --create --topic tests-irs2 --config min.insync.replicas=2 
--zookeeper=${KAFKA_ZOOKEEPER_CONNECT} --partitions=1 --replication-factor=3
$ kafka-topics.sh --describe --zookeeper ${KAFKA_ZOOKEEPER_CONNECT}
Topic:tests-irs2 PartitionCount:1 ReplicationFactor:3 
Configs:min.insync.replicas=2
Topic: tests-irs2 Partition: 0 Leader: 1003 Replicas: 1003,1002,1001 Isr: 
1003,1002,1001{code}
 

Then start a small script that produces message periodically

 
{code:java}
# Start the latency to get an idea of the normal latency
$ KAFKA_BOOTSTRAP_SERVERS=localhost:32784 KAFKA_TOPIC=tests-irs2 KAFKA_ACKS=-1 
../test.py
localhost:32784 tests-irs2 0.068457s
localhost:32784 tests-irs2 0.016032s
localhost:32784 tests-irs2 0.015884s
localhost:32784 tests-irs2 0.018244s
localhost:32784 tests-irs2 0.008625s{code}
 

Run `docker pause` on 1002

 
{code:java}
 
2018-11-27 14:07:47,608 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout 
waiting for future
Traceback (most recent call last):
 File "../test.py", line 27, in send_message
 producer.flush(timeout=MESSAGE_INTERVAL_SECS)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py",
 line 577, in flush
 self._accumulator.await_flush_completion(timeout=timeout)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py",
 line 530, in await_flush_completion
 raise Errors.KafkaTimeoutError('Timeout waiting for future')
KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
2018-11-27 14:07:49,618 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout 
waiting for future
Traceback (most recent call last):
 File "../test.py", line 27, in send_message
 producer.flush(timeout=MESSAGE_INTERVAL_SECS)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py",
 line 577, in flush
 self._accumulator.await_flush_completion(timeout=timeout)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py",
 line 530, in await_flush_completion
 raise Errors.KafkaTimeoutError('Timeout waiting for future')
KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
2018-11-27 14:07:51,628 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout 
waiting for future
Traceback (most recent call last):
 File "../test.py", line 27, in send_message
 producer.flush(timeout=MESSAGE_INTERVAL_SECS)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/