Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-01-29 Thread Jun Rao
Hi, Apurva,

Thanks for the update. My replies are inlined below.

On Wed, Jan 25, 2017 at 5:15 PM, Apurva Mehta  wrote:

> Hi Jun,
>
> Thanks again for the comments. More responses follow:
>
>
> > 101. Compatibility during upgrade: Suppose that the brokers are upgraded
> to
> > the new version, but the broker message format is still the old one. If a
> > new producer uses the transaction feature, should the producer get an
> error
> > in this case? A tricky case can be that the leader broker is on the new
> > message format, but the follower broker is still on the old message
> format.
> > In this case, the transactional info will be lost in the follower due to
> > down conversion. Should we failed the transactional requests when the
> > followers are still on the old message format?
> >
>
> This will only be an issue if applications are written to use transactions
> and are deployed with the new client before all the brokers are upgraded to
> the new message format.
>
> There are a variety of engineering solutions to this problem, one of which
> is for each broker to register itself as 'transaction ready' in zookeeper
> when it is on the right version of the message format. Once the controller
> detects that all brokers in the cluster are transaction ready, it will
> signal to each broker via the UpdateMetadataRequest that the cluster is
> ready for transactions. Any transactional requests received by brokers
> before this point will be rejected.
>
> A simpler way to solve this problem is through organizational policy: a
> cluster should not be advertised to application developers as 'transaction
> ready' until all brokers are on the new message format.
>
> I think the non-engineering solution is reasonable, and as such would
> prefer to not include engineering solutions in V1. It could be future work
> if necessary.
>
> We can make the problems that arise out of premature use of transactions
> clear in the release notes so that operators can take the necessary
> precautions. Is that reasonable?
>
>
> Yes, in the first version, we can just document the impact in the upgrade
doc.



>
>
> > 102. When there is a correlated hard failure (e.g., power outage), it's
> > possible that an existing commit/abort marker is lost in all replicas.
> This
> > may not be fixed by the transaction coordinator automatically and the
> > consumer may get stuck on that incomplete transaction forever. Not sure
> > what's the best way to address this. Perhaps, one way is to run a tool to
> > add an abort maker for all pids in all affected partitions.
> >
> >
> This is a good point. With the abort index proposal, if a correlated hard
> failure causes us to lose the markers everywhere, the LSO on the broker
> would not advance and consumers would block (but not buffer). This would be
> a noticeable situation.
>
> A simple tool may make use of internal functions to effectively do a
> 'initPID', 'beginTransaction', 'AddTopicPartitiontoTransaction',
> 'commitTransaction'. This would ensure that the markers are rewritten to
> all partitions by the transaction coordinator, but would also fence the
> existing producer with the same AppId.
>
> To make this workable, we need to make sure that the transaction
> coordinator adds a sufficient logging so that we know the AppID -> PID
> mapping as well as the partitions participating in each transaction. The
> broker should also probably log information so that we know which
> unfinished transaction (ie. which PID) is preventing the LSO from moving
> forward. Both these things will make it fairly easy to configure the tool.
>
> Of course, it is possible for the producer to continue onto another
> transaction before the tool is run, in which case the data will be corrupt
> since the second transaction will include messages from the first. But this
> is no worse than Kafka's existing durability semantics which this proposal
> relies on.
>
> I think such a tool can be a follow up work, and I have added it to the
> 'future work' section of the document.
>
>
> There can be two types of tools, one for diagnosing the issue and another
for fixing the issue. I think having at least a diagnostic tool in the
first version could be helpful. For example, the tool can report things
like which producer id is preventing the LSO from being advanced. That way,
at least the users can try to fix this themselves.


>
> > 103. Currently, there is no check for producer liveness. This means that
> if
> > a producer has not been sending transactional requests for a long time,
> its
> > appId will be expired by the coordinator. Have we considered having
> > producers sending heartbeatRequest just like the consumer to keep it
> alive?
> >
> >
> In the current proposal, a producer whose AppId has expired is a Zombie
> which will get a Fatal 'ProducerFencedException' when it tries to make any
> new transactional requests. A bounce of the producer will reinitialize it,
> at which point it can continue.
>
> As such, while 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-01-29 Thread Jun Rao
Hi, Jason,

Thanks for the reply. They sound good to me.

Jun

On Fri, Jan 27, 2017 at 4:42 PM, Jason Gustafson  wrote:

> A few more responses:
>
>
> > 101. Compatibility during upgrade: Suppose that the brokers are upgraded
> to
> > the new version, but the broker message format is still the old one. If a
> > new producer uses the transaction feature, should the producer get an
> error
> > in this case? A tricky case can be that the leader broker is on the new
> > message format, but the follower broker is still on the old message
> format.
> > In this case, the transactional info will be lost in the follower due to
> > down conversion. Should we failed the transactional requests when the
> > followers are still on the old message format?
>
>
> We've added some more details to the document about migration. Please take
> a look. Two points worth mentioning:
>
> 1. Replicas currently take the message format used by the leader. As long
> as users do the usual procedure of two rolling bounces, it should be safe
> to upgrade the message format.
>
> 2. There is no way to support idempotent or transactional features if we
> downgrade the message format in the produce request handler. We've modified
> the design document to only permit message downgrades if the producer has
> disabled idempotence. Otherwise, we will return an
> UNSUPPORTED_FOR_MESSAGE_FORMAT error.
>
> 110. Transaction log:
> > 110.1 "Key => Version AppID Version" It seems that Version should really
> be
> > Type?
> > 110.2 "Value => Version Epoch Status ExpirationTime [Topic Partition]"
> > Should we store [Topic [Partition]] instead?
> > 110.3 To expire an AppId, do we need to insert a tombstone with the
> expired
> > AppID as the key to physically remove the existing AppID entries in the
> > transaction log?
>
>
> Fixed in the document. For 110.3, yes, we need to insert a tombstone after
> the AppID has expired. This will work in much the same way as the consumer
> coordinator expires offsets using a periodic task.
>
> 116. ProducerRequest: The existing format doesn't have "MessageSetSize" at
> > the partition level.
>
>
> This was intentional, but it is easy to overlook. The idea is to modify the
> ProduceRequest so that only one message set is included for each partition.
> Since the message set contains its own length field, it seemed unnecessary
> to have a separate field. The justification for this change was to make the
> produce request atomic. With only a single message set for each partition,
> either it will be written successfully or not, so an error in the response
> will be unambiguous. We are uncertain whether there are legitimate use
> cases that require producing smaller message sets in the ProduceRequest, so
> we would love to hear feedback on this.
>
> Thanks,
> Jason
>
> On Fri, Jan 27, 2017 at 4:21 PM, Apurva Mehta  wrote:
>
> > Hi again Jun,
> >
> > I have update the document to address your comments below, but including
> > the responses inline to make it easier for everyone to stay on top of the
> > conversation.
> >
> >
> >
> > > 106. Compacted topics.
> > > 106.1. When all messages in a transaction are removed, we could remove
> > the
> > > commit/abort marker for that transaction too. However, we have to be a
> > bit
> > > careful. If the marker is removed too quickly, it's possible for a
> > consumer
> > > to see a message in that transaction, but not to see the marker, and
> > > therefore will be stuck in that transaction forever. We have a similar
> > > issue when dealing with tombstones. The solution is to preserve the
> > > tombstone for at least a preconfigured amount of time after the
> cleaning
> > > has passed the tombstone. Then, as long as a consumer can finish
> reading
> > to
> > > the cleaning point within the configured amount of time, it's
> guaranteed
> > > not to miss the tombstone after it has seen a non-tombstone message on
> > the
> > > same key. I am wondering if we should do something similar here.
> > >
> >
> > This is a good point. As we discussed offline, the solution for the
> removal
> > of control messages will be the same as the solution for problem of
> > tombstone removal documented in
> > https://issues.apache.org/jira/browse/KAFKA-4545.
> >
> > 106.2. "To address this problem, we propose to preserve the last epoch
> and
> > > sequence number written by each producer for a fixed amount of time as
> an
> > > empty message set. This is allowed by the new message format we are
> > > proposing in this document. The time to preserve the sequence number
> will
> > > be governed by the log retention settings. " Could you be a bit more
> > > specific on what retention time will be used since by default, there is
> > no
> > > retention time for compacted (but not delete) topic?
> > >
> >
> > We discussed this offline, and the consensus that it is reasonable to use
> > brokers global log.retention.* settings for these messages.
> >
> >
> > > 106.3 "As for control messages, if the broker does no

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-01-29 Thread Guozhang Wang
Hello Jun,

Thanks for the comments!! Some responses below:

*> 100. Security: **We could also include AppId in produce request..*

On brokers only PIDs are maintained and they are unaware of the AppIds, so
I think it would be costly to prevent writes on the AppId level. On the
other hand, having security based on AppId for transactional requests
trying to write to the transaction logs seems sufficient to me, since
producers always need to talk to the transaction coordinator first in order
to send data to partition leaders.


*> 101. A tricky case can be that the leader broker is on the new message
format, but the follower broker is still on the old message format...*

The upgrade path has been updated in the wiki page
.
Note that we will only let clients to start using the idempotent /
transactional features after the whole cluster has completed upgrading
(both inter-broker protocol and message format). But to reduce temporary
performance degradation we can consider letting clients to upgrade without
using the new features so that they will send / consume data following the
new message format, as indicated in step 3.


*> 102. When there is a correlated hard failure (e.g., power outage),
it's possible that an existing commit/abort marker is lost in all
replicas...*

As Apurva mentioned, we can provide an admin tool to let operators to fix
such issues when correlated hard failure happens.

Another potential solution is to let brokers to fsync on transaction
boundaries (i.e. when the markers are being written), so that the
likelihood of such hard failures causing markers to be completely lost can
be reduced.


*> 105. When the transaction coordinator changes (due to leadership
changes), it's possible for both the old and the new coordinator sending
requests to a broker at the same time (for a short period of time)...*

This is a good question. We have updated the design doc to add a
coordinator epoch in the WriteTxnMarkerRequest as well as added it in the
transaction message's value payload and the PID snapshot file (see here

for details).
The coordinator epoch corresponds to the transaction log's leader epoch.


*> 107. Could you include the default values for the newly introduced
configs?*

Have updated the design doc with the default values of newly added configs,
see here
,
here

and here

.


*> 117. UpdateTxnRequest: Could you explain the format of Marker?*

Note that we have renamed UpdateTxnRequest to WriteTxnMarkerRequest to be
more specific.
We have update the doc

to include its current possible values.


*> 118. TxnOffsetCommitRequest: How is retention time determined? Do we
need a new config in producer or just default it to -1 as the consumer?*

-1 will be used as the consumer. Corresponding section

is updated.


*> 121. The ordering is important with idempotent producer, which
means that max.in.flight.requests.per.connection should be set to 1. Do we
want to enforce this?*

I think it is actually not necessary, since the brokers will "strictly"
check the sequence number that must be current sequence + 1, so as long as
the first request fails, the rest will doom to fail as well.


*> 122. Currently, since we don't know the number of messages in a
compressed set, to finish the iteration, we rely on catching EOF in the
decompressor, which adds a bit overhead in the consumer.*

The logic is not to only relying on catching EOF, but also depending on the
offsetDelta to determine the "higher bound" of the number of messages. So
only if log compaction is triggered and the last message(s) are compacted,
then we need to rely on catching EOFs, whose cost would be much less than
KAFKA-4293.


*> 123. I am wondering if the coordinator needs to add a "BEGIN
transaction message" on a BeginTxnRequest. **Could we just wait until an
AddPartitionsToTxnRequest?*

It is possible, though no likely, that a client sends an
AddOffsetsToTxnRequest right after a BeginTxnRequest, in this case we need
to make sure that there is already an on-going transaction.



Guozhang


On Wed, Jan 25, 2017 at 6:00 PM, Apurva Mehta  wrote:

> On Tue, Jan 17, 2017 at 6:17 PM, Apurva Mehta  

Rewind Kafka Stream consumer offset by timestamp

2017-01-29 Thread Jorge Esteban Quilcate Otoya
Hi everyone,

I was wondering if its possible to rewind consumers offset in Kafka Stream
using timestamp as with `offsetsForTimes(Map
timestampsToSearch)` in KafkaConsumer.

I know its possible to go back to `earliest` offset in topic or `latest`,
but would be useful to go back using timestamp as with Consumer API do.

Maybe is there an option to do this already and I'm missing something?

Thanks in advance for your feedback!

Jorge.


[GitHub] kafka pull request #2463: 4706

2017-01-29 Thread sharad-develop
GitHub user sharad-develop opened a pull request:

https://github.com/apache/kafka/pull/2463

4706

4706 - Unify StreamsKafkaClient instances

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/kafka 0.10.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2463


commit 7946b2f35c0e3811ba7d706fc9c761d73ece47bb
Author: Damian Guy 
Date:   2017-01-16T19:40:47Z

MINOR: Remove unused constructor param from ProcessorStateManager

Remove applicationId parameter as it is no longer used.

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2385 from dguy/minor-remove-unused-param

commit 621dff22e79dc64b9a8748186dd985774044f91a
Author: Rajini Sivaram 
Date:   2017-01-17T11:16:29Z

KAFKA-4363; Documentation for sasl.jaas.config property

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #2316 from rajinisivaram/KAFKA-4363

commit e3f4cdd0e249f78a7f4e8f064533bcd15eb11cbf
Author: Rajini Sivaram 
Date:   2017-01-17T12:55:07Z

KAFKA-4590; SASL/SCRAM system tests

Runs sanity test and one replication test using SASL/SCRAM.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2355 from rajinisivaram/KAFKA-4590

commit 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f
Author: Rajini Sivaram 
Date:   2017-01-17T18:42:55Z

KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to 
use new sasl.jaas_config property instead of static JAAS configuration file 
when used with SASL_PLAINTEXT.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2323 from rajinisivaram/KAFKA-4580

(cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a)
Signed-off-by: Ismael Juma 

commit 60d759a227087079e6fd270c68fd9e38441cb34a
Author: Jason Gustafson 
Date:   2017-01-17T18:42:05Z

MINOR: Some cleanups and additional testing for KIP-88

Author: Jason Gustafson 

Reviewers: Vahid Hashemian , Ismael Juma 


Closes #2383 from hachikuji/minor-cleanup-kip-88

commit c9b9acf6a8b542c2d0d825c17a4a20cf3fa5
Author: Damian Guy 
Date:   2017-01-17T20:33:11Z

KAFKA-4588: Wait for topics to be created in 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race 
between when the topic is actually created/ready on the broker and when the 
assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets 
called with a `Cluster` with no topics. Hence the test hangs as no tasks get 
assigned. To fix this I added a `waitForTopics` method to 
`EmbeddedKafkaCluster`. This will wait until the topics have been created.

Author: Damian Guy 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix

(cherry picked from commit 825f225bc5706b16af8ec44ca47ee1452c11e6f3)
Signed-off-by: Guozhang Wang 

commit 6f72a5a53c444278187fa6be58031168bcaffb26
Author: Damian Guy 
Date:   2017-01-17T22:13:46Z

KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

This is a follow up of https://github.com/apache/kafka/pull/2166 - 
refactoring the store hierarchies as requested

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2360 from dguy/state-store-refactor

(cherry picked from commit 73b7ae0019d387407375f3865e263225c986a6ce)
Signed-off-by: Guozhang Wang 

commit eb62e5695506ae13bd37102c3c08e8a067eca0c8
Author: Ismael Juma 
Date:   2017-01-18T02:43:10Z

KAFKA-4591; Create Topic Policy follow-up

1. Added javadoc to public classes
2. Removed `s` from config name for consistency with interface name
3. The policy interface now implements Configurable and AutoCloseable as 
per the KIP
4. Use `null` instead of `-1` in `RequestMetadata`
5. Perform all broker validation before invoking the policy
6. Add tests

Author: Ismael Juma 

Reviewers: Jason Gustafson 

Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change

(cherry picked from commit fd6d7bcf335166a524dc9a29a50c96af8f1c1c02)
Signed-off-by: Ismael Juma 

commit e38794e020951adec5a5d0bbfe42c57294bf67bd
Author: Guozhang Wang 
Date:   2017-01-18T04:29:55Z

KAFKA-3502; move RocksDB options construction to init()

In RocksDBStore, options / wOptions / fOptions are constructed in the 
constructor, which needs to be dismissed in the close() call; however in some 
tests, the generated topology is no

Re: Rewind Kafka Stream consumer offset by timestamp

2017-01-29 Thread Eno Thereska
Hi Jorge,

This is currently not possible, but it is likely to be considered for 
discussion. One challenge is that, if you have multiple topics, it is difficult 
to rewind them all back to a consistent point in time. KIP-95, currently under 
discussion, is handling the slightly different issue, of stopping the consuming 
at a point in time: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams
 
.

Thanks
Eno
> On 29 Jan 2017, at 19:29, Jorge Esteban Quilcate Otoya 
>  wrote:
> 
> Hi everyone,
> 
> I was wondering if its possible to rewind consumers offset in Kafka Stream
> using timestamp as with `offsetsForTimes(Map
> timestampsToSearch)` in KafkaConsumer.
> 
> I know its possible to go back to `earliest` offset in topic or `latest`,
> but would be useful to go back using timestamp as with Consumer API do.
> 
> Maybe is there an option to do this already and I'm missing something?
> 
> Thanks in advance for your feedback!
> 
> Jorge.



[GitHub] kafka pull request #2462: MINOR: JavaDoc markup cleanup

2017-01-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2462


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-01-29 Thread Eugen Dueck

Thanks Apurva - replies inline.

On 2017年01月27日 15:19, Apurva Mehta wrote:

Eugen, moving your email to the main thread so that it doesn't get split.

The `transaction.app.id` is a prerequisite for using transactional APIs.
And only messages wrapped inside transactions will enjoy idempotent
guarantees across sessions, and that too only when they employ a
consume-process-produce pattern.


Say I have a producer, producing messages into a topic and I only want 
to guarantee the producer cannot insert duplicates. In other words, 
there's no downstream consumer/processor to be worried about - which, 
when considering the correctness of the data only, is all I need for 
idempotent producers, as every message has a unique id (offset), so 
downstream processes can take care of exactly once processing by any 
number of means. (If you need transactional all-or-none behavior, which 
KIP-98 also addresses, that's of course a more complex story)


I was under the impression that KIP-98 would fulfill above requirement, 
i.e. the prevention of duplicate inserts of the same message into a 
topic per producer, without using transactions, and guaranteed across 
tcp connections to handle producer/broker crashes and network problems.



In other words, producers where the `transaction.app.id` is specified will
not enjoy idempotence across sessions unless their messages are
transactional. ie. that the sends  are wrapped between `beginTransaction`,
`sendOffsets`, and `commitTransaction`.


From the KIP-98 wiki and the design document, I understand that AppIDs, 
PIDs, and sequence numbers are enforced regardless of their being 
wrapped in a transaction or not. Is that not so?


Cheers,
Eugen


The comment about the heartbeat was just a passing comment about the fact
that an AppId could be expired if a producer doesn't use transactions for a
long time. We don't plan to implement heartbeats in V1, though we might in
the future.

Hope this clarified things.

Regards,
Apurva


KIP-98 says

 > transaction.app.id: A unique and persistent way to identify a
producer. This is used to ensure idempotency and to enable transaction
recovery or rollback across producer sessions. This is optional: you will
lose cross-session guarantees if this is blank.
which might suggest that a producer that does not use the transactional
features, but does set the transaction.app.id, could get cross-session
idempotency. But the design document "Exactly Once Delivery and
Transactional Messaging in Kafka" rules that out:
 > For the idempotent producer (i.e., producer that do not use
transactional APIs), currently we do not make any cross-session guarantees
in any case. In the future, we can extend this guarantee by having the
producer to periodically send InitPIDRequest to the transaction coordinator
to keep the AppID from expiring, which preserves the producer's zombie
defence.
Until that point in the future, could my non-transactional producer send a
InitPIDRequest once and then heartbeat via BeginTxnRequest/EndTxnRequest(ABORT)
in intervals less than transaction.app.id.timeout.ms in order to
guarantee cross-session itempotency? Or is that not guaranteed because
"currently we do not make any cross-session guarantees in any case"? I know
this is would be an ugly hack.
I guess that is also what the recently added "Producer HeartBeat" feature
proposal would address - although it is described to prevent idle
transactional producers from having their AppIds expired.

Related question: If KIP-98 does not make cross-session guarantees for
idempotent producers, is the only improvement over the current idempotency
situation the prevention of duplicate messages in case of a partition
leader migration? Because if a broker fails or the publisher fails, KIP-98
does not seem to change the risk of dupes for non-transactional producers.








Btw: Good job! Both in terms of Kafka in general, and KIP-98 in particular



Cheers

On Wed, Jan 25, 2017 at 6:00 PM, Apurva Mehta  wrote:




On Tue, Jan 17, 2017 at 6:17 PM, Apurva Mehta  wrote:


Hi Jun,

Some answers in line.


109. Could you describe when Producer.send() will receive an Unrecognized

MessageException?


This exception will be thrown if the producer sends a sequence number
which is greater than the sequence number expected by the broker (ie. more
than 1 greater than the previously sent sequence number). This can happen
in two cases:

a) If there is a bug in the producer where sequence numbers are
incremented more than once per message. So the producer itself will send
messages with gaps in sequence numbers.
b) The broker somehow lost a previous message. In a cluster configured
for durability (ie. no unclean leader elections, replication factor of 3,
min.isr of 2, acks=all, etc.), this should not happened.

So realistically, this exception will only be thrown in clusters
configured for high availability where brokers could lose messages.

Becket raised the question if we should throw this exception at all in
c

Re: Rewind Kafka Stream consumer offset by timestamp

2017-01-29 Thread Matthias J. Sax
You can always built you own little tool similar to StreamsResetter.java
to get this done. Ie, you set the committed offset "manually" based on
timestamps before you start your application.

But as Eno mentioned, you need to think carefully about what a
consistent reset point would be because you cannot reset the
application's state...

If you start you application with an empty state, this might be less of
an concern though and seems reasonable.


-Matthias

On 1/29/17 12:55 PM, Eno Thereska wrote:
> Hi Jorge,
> 
> This is currently not possible, but it is likely to be considered for 
> discussion. One challenge is that, if you have multiple topics, it is 
> difficult to rewind them all back to a consistent point in time. KIP-95, 
> currently under discussion, is handling the slightly different issue, of 
> stopping the consuming at a point in time: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>  
> .
> 
> Thanks
> Eno
>> On 29 Jan 2017, at 19:29, Jorge Esteban Quilcate Otoya 
>>  wrote:
>>
>> Hi everyone,
>>
>> I was wondering if its possible to rewind consumers offset in Kafka Stream
>> using timestamp as with `offsetsForTimes(Map
>> timestampsToSearch)` in KafkaConsumer.
>>
>> I know its possible to go back to `earliest` offset in topic or `latest`,
>> but would be useful to go back using timestamp as with Consumer API do.
>>
>> Maybe is there an option to do this already and I'm missing something?
>>
>> Thanks in advance for your feedback!
>>
>> Jorge.
> 
> 



signature.asc
Description: OpenPGP digital signature


Jenkins build is back to normal : kafka-trunk-jdk8 #1233

2017-01-29 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #2464: KAFKA-4662: adding test coverage for addSource met...

2017-01-29 Thread bbejeck
GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/2464

KAFKA-4662: adding test coverage for addSource methods with AutoOffse…

…tReset

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka 
KAFKA-4662_improve_topology_builder_test_coverage

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2464


commit 52a90f521a701388aebf1eb92440aedeb351b45d
Author: bbejeck 
Date:   2017-01-30T02:50:26Z

KAFKA-4662: adding test coverage for addSource methods with AutoOffsetReset




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-29 Thread Guozhang Wang
Thinking loud here about the API options (materialize v.s. overloaded
functions) and its impact on IQ:

1. The first issue of the current DSL is that, there is inconsistency upon
whether / how KTables should be materialized:

a) in many cases the library HAS TO materialize KTables no matter what,
e.g. KStream / KTable aggregation resulted KTables, and hence we enforce
users to provide store names and throw RTE if it is null;
b) in some other cases, the KTable can be materialized or not; for
example in KStreamBuilder.table(), store names can be nullable and in which
case the KTable would not be materialized;
c) in some other cases, the KTable will never be materialized, for
example KTable.filter() resulted KTables, and users have no options to
enforce them to be materialized;
d) this is related to a), where some KTables are required to be
materialized, but we do not enforce users to provide a state store name,
e.g. KTables involved in joins; a RTE will be thrown not immediately but
later in this case.

2. The second issue is related to IQ, where state stores are accessed by
their state stores; so only those KTable's that have user-specified state
stores will be queryable. But because of 1) above, many stores may not be
interested to users for IQ but they still need to provide a (dummy?) state
store name for them; while on the other hand users cannot query some state
stores, e.g. the ones generated by KTable.filter() as there is no APIs for
them to specify a state store name.

3. We are aware from user feedbacks that such backend details would be
better be abstracted away from the DSL layer, where app developers should
just focus on processing logic, while state stores along with their
changelogs etc would better be in a different mechanism; same arguments
have been discussed for serdes / windowing triggers as well. For serdes
specifically, we had a very long discussion about it and concluded that, at
least in Java7, we cannot completely abstract serde away in the DSL, so we
choose the other extreme to enforce users to be completely aware of the
serde requirements when some KTables may need to be materialized vis
overloaded API functions. While for the state store names, I feel it is a
different argument than serdes (details below).


So to me, for either materialize() v.s. overloaded functions directions,
the first thing I'd like to resolve is the inconsistency issue mentioned
above. So in either case: KTable materialization will not be affect by user
providing state store name or not, but will only be decided by the library
when it is necessary. More specifically, only join operator and
builder.table() resulted KTables are not always materialized, but are still
likely to be materialized lazily (e.g. when participated in a join
operator).


For overloaded functions that would mean:

a) we have an overloaded function for ALL operators that could result
in a KTable, and allow it to be null (i.e. for the function without this
param it is null by default);
b) null-state-store-name do not indicate that a KTable would not be
materialized, but that it will not be used for IQ at all (internal state
store names will be generated when necessary).


For materialize() that would mean:

a) we will remove state store names from ALL operators that could
result in a KTable.
b) KTables that not calling materialized do not indicate that a KTable
would not be materialized, but that it will not be used for IQ at all
(internal state store names will be generated when necessary).


Again, in either ways the API itself does not "hint" about anything for
materializing a KTable or not at all; it is still purely determined by the
library when parsing the DSL for now.

Following these thoughts, I feel that 1) we should probably change the name
"materialize" since it may be misleading to users as what actually happened
behind the scene, to e.g. Damian suggested "queryableStore(String storeName)",
which returns a QueryableStateStore, and can replace the
`KafkaStreams.store` function; 2) comparing those two options assuming we
get rid of the misleading function name, I personally favor not adding more
overloading functions as it keeps the API simpler.



Guozhang


On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak 
wrote:

> Hi,
>
> thanks for your mail, felt like this can clarify some things! The thread
> unfortunately split but as all branches close in on what my suggestion was
> about Ill pick this to continue
>
> Of course only the table the user wants to query would be materialized.
> (retrieving the queryhandle implies materialisation). So In the example of
> KTable::filter if you call
> getIQHandle on both tables only the one source that is there would
> materialize and the QueryHandleabstraction would make sure it gets mapped
> and filtered and what not uppon read as usual.
>
> Of Course the Object you would retrieve would maybe only wrap the
> storeName / table unique identifier and a way to access t