Good point, I think we should shrink chunk size to "ClientCnx.getMaxMessageSize() - chunkMessageHeaderSize", as we have the same header size for each chunk message.
Thanks, Haiting Jiang On 2022/01/04 03:27:00 Zike Yang wrote: > But how do we handle chunked messages? The chunked message is split > based on the maxMessageSize(max payload size). This would seem to make > `op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()` > always true. > > Thanks, > Zike > > On Fri, Dec 31, 2021 at 8:11 PM Haiting Jiang <jianghait...@apache.org> wrote: > > > > Sorry, it should be PIP-132. > > > > Thanks, > > Haiting Jiang > > > > On 2021/12/31 12:05:54 Haiting Jiang wrote: > > > https://github.com/apache/pulsar/issues/13591 > > > > > > Pasted below for quoting convenience. > > > > > > —— > > > > > > ## Motivation > > > > > > Currently, Pulsar client (Java) only checks payload size for max message > > > size validation. > > > > > > Client throws TimeoutException if we produce a message with too many > > > properties, see [1]. > > > But the root cause is that is trigged TooLongFrameException in broker > > > server. > > > > > > In this PIP, I propose to include message header size when check > > > maxMessageSize of non-batch > > > messages, this brings the following benefits. > > > 1. Clients can throw InvalidMessageException immediately if properties > > > takes too much storage space. > > > 2. This will make the behaviour consistent with topic level max message > > > size check in broker. > > > 3. Strictly limit the entry size less than maxMessageSize, avoid sending > > > message to bookkeeper failed. > > > > > > ## Goal > > > > > > Include message header size when check maxMessageSize for non-batch > > > message on the client side. > > > > > > ## Implementation > > > > > > ``` > > > // Add a size check in > > > org.apache.pulsar.client.impl.ProducerImpl#processOpSendMsg > > > if (op.msg != null // for non-batch messages only > > > && op.getMessageHeaderAndPayloadSize() > > > > ClientCnx.getMaxMessageSize()) { > > > // finish send op with InvalidMessageException > > > releaseSemaphoreForSendOp(op); > > > op.sendComplete(new PulsarClientException(new > > > InvalidMessageException, op.sequenceId)); > > > } > > > > > > > > > // > > > org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg#getMessageHeaderAndPayloadSize > > > > > > public int getMessageHeaderAndPayloadSize() { > > > ByteBuf cmdHeader = cmd.getFirst(); > > > cmdHeader.markReaderIndex(); > > > int totalSize = cmdHeader.readInt(); > > > int cmdSize = cmdHeader.readInt(); > > > int msgHeadersAndPayloadSize = totalSize - cmdSize - 4; > > > cmdHeader.resetReaderIndex(); > > > return msgHeadersAndPayloadSize; > > > } > > > ``` > > > > > > ## Reject Alternatives > > > Add a new property like "maxPropertiesSize" or "maxHeaderSize" in > > > broker.conf and pass it to > > > client like maxMessageSize. But the implementation is much more complex, > > > and don't have the > > > benefit 2 and 3 mentioned in Motivation. > > > > > > ## Compatibility Issue > > > As a matter of fact, this PIP narrows down the sendable range. > > > Previously, when maxMessageSize > > > is 1KB, it's ok to send message with 1KB properties and 1KB payload. But > > > with this PIP, the > > > sending will fail with InvalidMessageException. > > > > > > One conservative way is to add a boolean config > > > "includeHeaderInSizeCheck" to enable this > > > feature. But I think it's OK to enable this directly as it's more > > > reasonable, and I don't see good > > > migration plan if we add a config for this. > > > > > > The compatibility issue is worth discussing. And any suggestions are > > > appreciated. > > > > > > [1] https://github.com/apache/pulsar/issues/13560 > > > > > > Thanks, > > > Haiting Jiang > > > > > > > -- > Zike Yang >