Good point,  I think we should shrink chunk size to 
"ClientCnx.getMaxMessageSize() - chunkMessageHeaderSize", as we have the same 
header size for each chunk message. 

Thanks,
Haiting Jiang

On 2022/01/04 03:27:00 Zike Yang wrote:
> But how do we handle chunked messages? The chunked message is split
> based on the maxMessageSize(max payload size). This would seem to make
> `op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()`
> always true.
> 
> Thanks,
> Zike
> 
> On Fri, Dec 31, 2021 at 8:11 PM Haiting Jiang <jianghait...@apache.org> wrote:
> >
> > Sorry, it should be PIP-132.
> >
> > Thanks,
> > Haiting Jiang
> >
> > On 2021/12/31 12:05:54 Haiting Jiang wrote:
> > > https://github.com/apache/pulsar/issues/13591
> > >
> > > Pasted below for quoting convenience.
> > >
> > > ——
> > >
> > > ## Motivation
> > >
> > > Currently, Pulsar client (Java) only checks payload size for max message 
> > > size validation.
> > >
> > > Client throws TimeoutException if we produce a message with too many 
> > > properties, see [1].
> > > But the root cause is that is trigged TooLongFrameException in broker 
> > > server.
> > >
> > > In this PIP, I propose to include message header size when check 
> > > maxMessageSize of non-batch
> > > messages, this brings the following benefits.
> > > 1. Clients can throw InvalidMessageException immediately if properties 
> > > takes too much storage space.
> > > 2. This will make the behaviour consistent with topic level max message 
> > > size check in broker.
> > > 3. Strictly limit the entry size less than maxMessageSize, avoid sending 
> > > message to bookkeeper failed.
> > >
> > > ## Goal
> > >
> > > Include message header size when check maxMessageSize for non-batch 
> > > message on the client side.
> > >
> > > ## Implementation
> > >
> > > ```
> > > // Add a size check in 
> > > org.apache.pulsar.client.impl.ProducerImpl#processOpSendMsg
> > > if (op.msg != null // for non-batch messages only
> > >     && op.getMessageHeaderAndPayloadSize() > 
> > > ClientCnx.getMaxMessageSize()) {
> > >     // finish send op with InvalidMessageException
> > >     releaseSemaphoreForSendOp(op);
> > >     op.sendComplete(new PulsarClientException(new 
> > > InvalidMessageException, op.sequenceId));
> > > }
> > >
> > >
> > > // 
> > > org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg#getMessageHeaderAndPayloadSize
> > >
> > > public int getMessageHeaderAndPayloadSize() {
> > >     ByteBuf cmdHeader = cmd.getFirst();
> > >     cmdHeader.markReaderIndex();
> > >     int totalSize = cmdHeader.readInt();
> > >     int cmdSize = cmdHeader.readInt();
> > >     int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
> > >     cmdHeader.resetReaderIndex();
> > >     return msgHeadersAndPayloadSize;
> > > }
> > > ```
> > >
> > > ## Reject Alternatives
> > > Add a new property like "maxPropertiesSize" or "maxHeaderSize" in 
> > > broker.conf and pass it to
> > > client like maxMessageSize. But the implementation is much more complex, 
> > > and don't have the
> > > benefit 2 and 3 mentioned in Motivation.
> > >
> > > ## Compatibility Issue
> > > As a matter of fact, this PIP narrows down the sendable range. 
> > > Previously, when maxMessageSize
> > > is 1KB, it's ok to send message with 1KB properties and 1KB payload. But 
> > > with this PIP, the
> > > sending will fail with InvalidMessageException.
> > >
> > > One conservative way is to add a boolean config 
> > > "includeHeaderInSizeCheck" to enable this
> > > feature. But I think it's OK to enable this directly as it's more 
> > > reasonable, and I don't see good
> > > migration plan if we add a config for this.
> > >
> > > The compatibility issue is worth discussing. And any suggestions are 
> > > appreciated.
> > >
> > > [1] https://github.com/apache/pulsar/issues/13560
> > >
> > > Thanks,
> > > Haiting Jiang
> > >
> 
> 
> 
> -- 
> Zike Yang
> 

Reply via email to