But how do we handle chunked messages? The chunked message is split
based on the maxMessageSize(max payload size). This would seem to make
`op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()`
always true.

Thanks,
Zike

On Fri, Dec 31, 2021 at 8:11 PM Haiting Jiang <jianghait...@apache.org> wrote:
>
> Sorry, it should be PIP-132.
>
> Thanks,
> Haiting Jiang
>
> On 2021/12/31 12:05:54 Haiting Jiang wrote:
> > https://github.com/apache/pulsar/issues/13591
> >
> > Pasted below for quoting convenience.
> >
> > ——
> >
> > ## Motivation
> >
> > Currently, Pulsar client (Java) only checks payload size for max message 
> > size validation.
> >
> > Client throws TimeoutException if we produce a message with too many 
> > properties, see [1].
> > But the root cause is that is trigged TooLongFrameException in broker 
> > server.
> >
> > In this PIP, I propose to include message header size when check 
> > maxMessageSize of non-batch
> > messages, this brings the following benefits.
> > 1. Clients can throw InvalidMessageException immediately if properties 
> > takes too much storage space.
> > 2. This will make the behaviour consistent with topic level max message 
> > size check in broker.
> > 3. Strictly limit the entry size less than maxMessageSize, avoid sending 
> > message to bookkeeper failed.
> >
> > ## Goal
> >
> > Include message header size when check maxMessageSize for non-batch message 
> > on the client side.
> >
> > ## Implementation
> >
> > ```
> > // Add a size check in 
> > org.apache.pulsar.client.impl.ProducerImpl#processOpSendMsg
> > if (op.msg != null // for non-batch messages only
> >     && op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()) 
> > {
> >     // finish send op with InvalidMessageException
> >     releaseSemaphoreForSendOp(op);
> >     op.sendComplete(new PulsarClientException(new InvalidMessageException, 
> > op.sequenceId));
> > }
> >
> >
> > // 
> > org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg#getMessageHeaderAndPayloadSize
> >
> > public int getMessageHeaderAndPayloadSize() {
> >     ByteBuf cmdHeader = cmd.getFirst();
> >     cmdHeader.markReaderIndex();
> >     int totalSize = cmdHeader.readInt();
> >     int cmdSize = cmdHeader.readInt();
> >     int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
> >     cmdHeader.resetReaderIndex();
> >     return msgHeadersAndPayloadSize;
> > }
> > ```
> >
> > ## Reject Alternatives
> > Add a new property like "maxPropertiesSize" or "maxHeaderSize" in 
> > broker.conf and pass it to
> > client like maxMessageSize. But the implementation is much more complex, 
> > and don't have the
> > benefit 2 and 3 mentioned in Motivation.
> >
> > ## Compatibility Issue
> > As a matter of fact, this PIP narrows down the sendable range. Previously, 
> > when maxMessageSize
> > is 1KB, it's ok to send message with 1KB properties and 1KB payload. But 
> > with this PIP, the
> > sending will fail with InvalidMessageException.
> >
> > One conservative way is to add a boolean config "includeHeaderInSizeCheck" 
> > to enable this
> > feature. But I think it's OK to enable this directly as it's more 
> > reasonable, and I don't see good
> > migration plan if we add a config for this.
> >
> > The compatibility issue is worth discussing. And any suggestions are 
> > appreciated.
> >
> > [1] https://github.com/apache/pulsar/issues/13560
> >
> > Thanks,
> > Haiting Jiang
> >



-- 
Zike Yang

Reply via email to