Jun, the message format you proposed seems reasonable to me. I have a few
minor comments with regards to the user facing API:

1. Do we want to expose the `close()` method in the Headers interface? It
seems that this method should only be called by the producer after the
headers have been passed to the interceptors, so it may make sense to keep
it as an internal method in the implementation class.

2. `header(String key)` returns the last header for that key. Maybe we
should make it explicit by calling the method `lastHeader(String key)`.

3. I agree with the change to throw an exception if we try to modify the
headers when they are in read-only mode. We should specify the exception in
the KIP. IllegalStateException, as suggested by Radai, seems reasonable.

4. How do users create a `Header` instance to pass to the `add` method? We
could introduce a static `create` method that takes both parameters to the
`Header` interface (requires Java 8).

5. There's no method to replace all the headers with a given key so one
would have to call `remove` and then `add`. Is the assumption that this is
rare? If so, that's probably OK, we can add another method later, if it's
useful.

Thanks,
Ismael

On Thu, Mar 16, 2017 at 4:44 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Everyone,
>
> Jason has been working on the new message format related to EOS (
> https://github.com/apache/kafka/pull/2614). He has included the header
> changes proposed in the KIP, which reduces the overhead for supporting an
> additional message format change if done separately. Since the message
> format part of the header proposal seems less controversial and the
> consensus is header is needed, does anyone have objections to this? The
> following is the new record format with headers.
>
> * Record =>
> *   Length => Varint
> *   Attributes => Int8
> *   TimestampDelta => Varlong
> *   OffsetDelta => Varint
> *   Key => Bytes
> *   Value => Bytes
> *   Headers => [HeaderKey HeaderValue]
> *     HeaderKey => String
> *     HeaderValue => Bytes
> *
> * Note that in this schema, the Bytes and String types use a variable
> length integer to represent
> * the length of the field. The array type used for the headers also
> uses a Varint for the number of
> * headers.
>
> Thanks,
>
> Jun
>
>
> On Tue, Mar 14, 2017 at 10:49 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > Thanks Radai. Great to have a concrete example of the intended usage.
> >
> > Regarding performance, we would need to benchmark, as you said. But there
> > would be a lot of reuse (in essence, we are copying 5 references plus a
> new
> > object header), so I'd be surprised if that would be the bottleneck
> > compared to some of the other allocations that would be happening in that
> > path. In any case, I think we can leave this aside for now since people
> > also felt that the mutable API would be easier to use.
> >
> > About ProducerRecord reuse, my understanding is that people do sometimes
> > retry a failed request manually due to the fact that a large retry number
> > doesn't help if a batch is expired in the queue. I believe KIP-91 will
> > help:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 91+Provide+Intuitive+User+Timeouts+in+The+Producer
> >
> > In addition, KIP-98 (Exactly-once) won't achieve its goal if people do
> > manual retries. So, it seems like it's OK to require people to create a
> new
> > ProducerRecord if they really need to do manual retries. But we should
> add
> > a note to the compatibility section of the KIP.
> >
> > I have a few minor API suggestions. I'll send a follow-up later today,
> > hopefully.
> >
> > Ismael
> >
> > On Mon, Mar 13, 2017 at 6:23 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > the common "stack" we envision at linkedin would consist of (at least)
> > the
> > > following components that add headers to every outgoing request:
> > >
> > > 1. auditing/"lineage" - appends a header containing "node" (hostname
> > etc),
> > > time (UTC time) and destination (cluster/topic). these accumulate as
> > > requests get mirrored between clusters
> > > 2. serialization - sets a header containing a schema identifier to
> allow
> > > deserialization
> > > 3. client-side encryption - would probably set a header identifying the
> > > key/scheme used
> > > 4. internal "billing"
> > >
> > > there are also several other teams at linkedin that would use headers
> > > (although its unclear yet if via interceptors or by directly
> manipulating
> > > requests)
> > >
> > > if headers are made completely immutable (as the entire request object
> > > currently is) we would end up copying (parts of) every msg 4 times. I
> > > havent benchmarked but this seems like it would have an impact to me.
> > >
> > > looking elsewhere rabbitMQ and http components both use mutable request
> > > objects (rabbitMW's BasicProperties object, http components' addHeader
> > > method).
> > >
> > > how common is it right now for instances of ProducerRecord to actually
> be
> > > reused?
> > > do people really have things like publis static final ProducerRecord
> > > MY_FAVORITE_REQUEST = ... ?
> > >
> >
>

Reply via email to