Re: [VOTE] KIP-331 Add default implementation to close() and configure() for Serializer, Deserializer and Serde

2018-07-07 Thread vito jeng
Thanks for the KIP!

+1(non-binding)




---
Vito

On Sat, Jul 7, 2018 at 11:29 AM, Chia-Ping Tsai  wrote:

> hi Ismael
>
> > I think we should be thinking about
> > https://issues.apache.org/jira/browse/KAFKA-6923 at the same time.
>
> you are right. KAFKA-6923 and KAFKA-6161 are the keys to complete the
> Serializer and Deserializer. Let us add the default implementations first
> (KAFKA-6161). And then integrate the ExtendedSerializer/ExtendedDeSerializer
> to Serializer/Deserialize (KAFKA-6923)
>
> --
> Chia-Ping
>
> On 2018/07/06 15:33:18, Ismael Juma  wrote:
> > Thanks for the KIP. I think we should be thinking about
> > https://issues.apache.org/jira/browse/KAFKA-6923 at the same time.
> >
> > Ismael
> >
> > On Thu, 5 Jul 2018, 07:45 Chia-Ping Tsai,  wrote:
> >
> > > hi all,
> > >
> > > I would like to start voting on "KIP-331 Add default implementation to
> > > close() and configure() for Serializer, Deserializer and Serde"
> > >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+
> implementation+to+close%28%29+and+configure%28%29+for+
> Serializer%2C+Deserializer+and+Serde
> > >
> > > Cheers,
> > > Chia-Ping
> > >
> >
>


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-07 Thread Dong Lin
Hey Anna,

Thanks much for the thoughtful reply. It makes sense to different between
"seeking to a message" and "seeking to a position". I have to questions
here:

- For "seeking to a message" use-case, with the proposed approach user
needs to call findOffset(offset, leaderEpoch) followed by seek(offset). If
message truncation and message append happen immediately after
findOffset(offset,
leaderEpoch) but before seek(offset), it seems that user will seek to the
wrong message without knowing the truncation has happened. Would this be a
problem?

- For "seeking to a position" use-case, it seems that there can be two
positions, i.e. earliest and latest. So these two cases can be
Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then it
seems that user will only need to call position() and seek() for "seeking
to a message" use-case?

Thanks,
Dong


On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner  wrote:

> Hi Jason and Dong,
>
>
> I’ve been thinking about your suggestions and discussion regarding
> position(), seek(), and new proposed API.
>
>
> Here is my thought process why we should keep position() and seek() API
> unchanged.
>
>
> I think we should separate {offset, leader epoch} that uniquely identifies
> a message from an offset that is a position. In some cases, offsets
> returned from position() could be actual consumed messages by this consumer
> identified by {offset, leader epoch}. In other cases, position() returns
> offset that was not actually consumed. Suppose, the user calls position()
> for the last offset. Suppose we return {offset, leader epoch} of the
> message currently in the log. Then, the message gets truncated before
> consumer’s first poll(). It does not make sense for poll() to fail in this
> case, because the log truncation did not actually happen from the consumer
> perspective. On the other hand, as the KIP proposes, it makes sense for the
> committed() method to return {offset, leader epoch} because those offsets
> represent actual consumed messages.
>
>
> The same argument applies to the seek() method — we are not seeking to a
> message, we are seeking to a position.
>
>
> I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
> something like:
>
> Map findOffsets(Map
> offsetsToSearch)
>
> Similar to seek() and position(), I think findOffsets() should return
> offset without leader epoch, because what we want is the offset that we
> think is closest to the not divergent message from the given consumed
> message. Until the consumer actually fetches the message, we should not let
> the consumer store the leader epoch for a message it did not consume.
>
>
> So, the workflow will be:
>
> 1) The user gets LogTruncationException with {offset, leader epoch of the
> previous message} (whatever we send with new FetchRecords request).
>
> 2) offset = findOffsets(tp -> {offset, leader epoch})
>
> 3) seek(offset)
>
>
> For the use-case where the users store committed offsets externally:
>
> 1) Such users would have to track the leader epoch together with an offset.
> Otherwise, there is no way to detect later what leader epoch was associated
> with the message. I think it’s reasonable to ask that from users if they
> want to detect log truncation. Otherwise, they will get the current
> behavior.
>
>
> If the users currently get an offset to be stored using position(), I see
> two possibilities. First, they call save offset returned from position()
> that they call before poll(). In that case, it would not be correct to
> store {offset, leader epoch} if we would have changed position() to return
> {offset, leader epoch} since actual fetched message could be different
> (from the example I described earlier). So, it would be more correct to
> call position() after poll(). However, the user already gets
> ConsumerRecords at this point, from which the user can extract {offset,
> leader epoch} of the last message.
>
>
> So, I like the idea of adding a helper method to ConsumerRecords, as Jason
> proposed, something like:
>
> public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is
> a data struct holding {offset, leader epoch}.
>
>
> In this case, we would advise the user to follow the workflow: poll(), get
> {offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(),
> save offset and leader epoch, process records.
>
>
> 2) When the user needs to seek to the last committed offset, they call new
> findOffsets(saved offset, leader epoch), and then seek(offset).
>
>
> What do you think?
>
>
> Thanks,
>
> Anna
>
>
> On Tue, Jul 3, 2018 at 4:06 PM Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks much for your thoughtful explanation.
> >
> > Yes the solution using findOffsets(offset, leaderEpoch) also works. The
> > advantage of this solution it adds only one API instead of two APIs. The
> > concern is that its usage seems a bit more clumsy for advanced users.
> More
> > specifically, advanced users who store offsets externally will alw

Old deprecated producer

2018-07-07 Thread jna

Hello,

I'm using the old producer API, and i saw since a long time (few 
versions) that this API is deprecated. When will you decide to remove 
this old API? If you won't remove it, perhaps you could remove the 
deprecated. Will you provide a new way to produce a user batch of 
records or the transactions replace already this batch producer?


Thanks.



Re: Old deprecated producer

2018-07-07 Thread Ismael Juma
The old Scala producers were removed in 2.0.0. The Java producer supports
batching.

Ismael


On Sat, 7 Jul 2018, 05:38 jna,  wrote:

> Hello,
>
> I'm using the old producer API, and i saw since a long time (few
> versions) that this API is deprecated. When will you decide to remove
> this old API? If you won't remove it, perhaps you could remove the
> deprecated. Will you provide a new way to produce a user batch of
> records or the transactions replace already this batch producer?
>
> Thanks.
>
>