You can do a conditional branching by using KStream.branch(Predicate...).
You can then merge multiple streams using KStreamBuilder.merge(KStream...).
-Yasuhiro
On Mon, Jun 20, 2016 at 4:45 AM, Jeyhun Karimov
wrote:
> Hi Guozhang,
>
> Thank you for your reply. Yes, it is correct. Your solution i
---
Thanks,
Yasuhiro Matsuda
enerated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76975
---
On April 1, 2015, 12:31 a.m., Yasuhiro Matsuda wrote:
>
> ---
> This is an automatically generated e-mail.
gt;
> >
> > Not quite sure what this is testing. It's not clear to me why the
> > sharedCounter won't increase after add. Perhaps, we can add some comments.
It is testing that reinserting the existing tasks doesn't change the task
count. I will add comments.
-CREATION
Diff: https://reviews.apache.org/r/31568/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
n why that is better? It will trigger a lot more purge calls.
And the frequency of calls depends on how many keys each request has. When the
average number of keys per operation is large, it is possible to have a case
that the total number of watchers exceeds the threshold, but there are only a
-CREATION
Diff: https://reviews.apache.org/r/31568/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
single null compare (no method
call, no synchronization), which is the cheapest.
- Yasuhiro
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79257
------
only if it returns
> > true, run onExpiration().
>
> Yasuhiro Matsuda wrote:
> This came from the original ExpiredOperationReaper.expireNext(). Also the
> comment on onExpiration says, "Call-back to execute when a delayed operation
> expires, but before completion.
-
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79269
---
On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
>
>
multiple executor threads in the pool.
I agree!
- Yasuhiro
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79408
-------
-CREATION
Diff: https://reviews.apache.org/r/31568/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
-CREATION
Diff: https://reviews.apache.org/r/31568/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
/33028/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33071/#review80342
---
Cool! It looks good to me.
- Yasuhiro Matsuda
On April 10, 2015
,
Yasuhiro Matsuda
apache.org/r/34734/#review85530
---
On May 27, 2015, 9 p.m., Yasuhiro Matsuda wrote:
>
> ---
> This is an automatically generated e-mail. To reply, vis
e-mail. To reply, visit:
https://reviews.apache.org/r/34734/#review85596
---
On May 27, 2015, 9 p.m., Yasuhiro Matsuda wrote:
>
> ---
> This is an automatically generated
> > 134 is called and the entry is not removed since list is now null, (4) line
> > 133 is tested again and since list is now null, we quit the loop, (5) the
> > reinsert process adds the entry to a new list.
> >
> > At this point, a completed entry still exi
> > 134 is called and the entry is not removed since list is now null, (4) line
> > 133 is tested again and since list is now null, we quit the loop, (5) the
> > reinsert process adds the entry to a new list.
> >
> > At this point, a completed entry still exi
/r/34734/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34734/#review85665
---
On May 29, 2015, 12:19 a.m., Yasuhiro Matsuda wrote:
>
> --
/r/34734/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
; remove the entry from the list.
I will remove this.
- Yasuhiro
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34734/#review85596
---------
ms that we can add
> > the entry either when it's created for the first time or when it's removed
> > from the current list and needs to be added to a new list during reinsert.
> > In both cases, the list in the entry will be null and there is no need to
> > remove
led" is a legitimate spelling in American
English.
- Yasuhiro
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34734/#review85764
---
/r/34734/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
/r/34734/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
/browse/KAFKA-1965
Repository: kafka
Description
---
leaner DelayedItem
Diffs
-
core/src/main/scala/kafka/utils/DelayedItem.scala
a4e0dabc858bc0081ba4fc0deea203bebd8bbf6b
Diff: https://reviews.apache.org/r/31199/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
/
Testing
---
Thanks,
Yasuhiro Matsuda
/TimerTest.scala PRE-CREATION
Diff: https://reviews.apache.org/r/31568/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
/message/MessageWriterTest.scala PRE-CREATION
Diff: https://reviews.apache.org/r/31742/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
/browse/KAFKA-2013
Repository: kafka
Description
---
purgatory micro benchmark
Diffs
-
core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION
Diff: https://reviews.apache.org/r/31893/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
.
Well, I don't think it is particulary better way to organize code, but if you
insist I can change it.
Kafka code base doesn't seem to follow that convention...
On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
> > The inheritance of MessageWriter from BufferingOutputStre
ee such
a timer works accurately in this test setting.
- Yasuhiro
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
------
---
Thanks,
Yasuhiro Matsuda
---
Thanks,
Yasuhiro Matsuda
---
Thanks,
Yasuhiro Matsuda
we need to overwrite isCompleted()?
> > Typically, only tryComplete() and onComplete() need to be overwritten in a
> > subclass of DelayedOperation.
> >
> > Actually, I am not sure how we complete the requests before the timeout
> > is reached since there is
/MessageWriter.scala PRE-CREATION
core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION
Diff: https://reviews.apache.org/r/31742/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
> > The inheritance of MessageWriter from BufferingOutputStream is a bit
> > confusing, since it will always use itself in the writePayload function
> > parameter.
> >
> > I feel it is more clear t
t.scala, lines
> > 29-33
> > <https://reviews.apache.org/r/31568/diff/1/?file=881361#file881361line29>
> >
> > Could we just add an atomic integer recording the list size and size()
> > function to TimerTaskList?
We size the list only in this
> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116>
> >
> > We need to make tickMs and wheelSize configur
-----
On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
>
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> --
set once throughout
> > its life time; even when the task entry gets reinsurted its correspondence
> > to the task will not change, right?
> >
> > If that is true we can just set the entry for the task in the
> > constructor of the task entry.
>
>
e can rename off to initOffset and offset to currOffset?
This is consistent with OutputStream's parameter naming.
- Yasuhiro
-------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76985
-
---
Thanks,
Yasuhiro Matsuda
sageWrite is just for formatting a message.
- Yasuhiro
---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76985
-------
ps://reviews.apache.org/r/31742/#review76985
---
On March 16, 2015, 10:19 p.m., Yasuhiro Matsuda wrote:
>
> ---
> This is an automatically generated e-mail.
PRE-CREATION
core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION
Diff: https://reviews.apache.org/r/31742/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
-CREATION
Diff: https://reviews.apache.org/r/31568/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
PRE-CREATION
core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION
Diff: https://reviews.apache.org/r/31742/diff/
Testing
---
Thanks,
Yasuhiro Matsuda
It may not be ideal, but there is a way to prioritize particular topics. It
is to set the record timestamps to zero. This can be done by using a custom
TimestampExtractor. Kafka Streams tries to synchronize multiple streams
using the extracted timestamps. So, records with the timestamp 0 have
great
n general, being able to get the offsets from the producer interface
> > does sound convenient.
> >
> > > We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
> > you
> > > can describe this KIP a bit then?
> >
> > Sure, happy to join
The goal of this KIP is to provide a lightweight/embeddable streaming
framework, and allows Kafka users to start using stream processing easily. DSL
is not covered in this KIP. But, DSL is a very attractive option to have.
> In the proposed KafkaProcessor API, there is no interface like Collector
Jay, I understand that. Context can provide more information without
breaking the compatibility if needed. Also I am not sure ConsumerRecord is
the right abstraction of data for stream processing. After transformation
or join, what is the topic and the offset? It is odd to use ConsumerRecord.
We ca
t; > >> >> > For 1, yes, when there is a transient leader change,
> it's
> > > >> > > guaranteed
> > > >> > > >> >> that a
> > > >> > > >> >> > prefix of the messages in a request will be co
A partitioning scheme should be a cluster wide thing. Letting each sink
have a different partitioning scheme does not make sense to me. A
partitioning scheme is not specific to a stream job, each task or a sink. I
think specifying it at sink level is more error prone.
If a user wants to customize
t; The bottom line is that *different* topics will likely need to be
> partitioned differently.
>
> On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda (
> yasuhiro.mats...@gmail.com) wrote:
>
> A partitioning scheme should be a cluster wide thing. Letting each sink
> have a d
The group id is removed from the restore consumer config because the
restore consumer should not participate in the specified consumer group. I
don't know why it is failing.
On Fri, Nov 27, 2015 at 12:37 PM, Guozhang Wang wrote:
> Hello Bill,
>
> Thanks for reporting it, this is a valid issue, c
[
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14389764#comment-14389764
]
Yasuhiro Matsuda commented on KAFKA-2013:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2013:
Attachment: KAFKA-2013_2015-03-31_17:30:56.patch
> benchmark test for the purgat
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14391422#comment-14391422
]
Yasuhiro Matsuda commented on KAFKA-1989:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1989:
Attachment: KAFKA-1989_2015-04-01_13:49:58.patch
> New purgatory des
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1989:
Attachment: KAFKA-1989_2015-04-07_14:59:33.patch
> New purgatory des
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14484170#comment-14484170
]
Yasuhiro Matsuda commented on KAFKA-1989:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1989:
Attachment: KAFKA-1989_2015-04-08_13:27:59.patch
> New purgatory des
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485950#comment-14485950
]
Yasuhiro Matsuda commented on KAFKA-1989:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1989:
Attachment: KAFKA-1989_2015-04-08_14:29:51.patch
> New purgatory des
Yasuhiro Matsuda created KAFKA-2112:
---
Summary: make overflowWheel volatile
Key: KAFKA-2112
URL: https://issues.apache.org/jira/browse/KAFKA-2112
Project: Kafka
Issue Type: Bug
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486099#comment-14486099
]
Yasuhiro Matsuda commented on KAFKA-1989:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2112:
Assignee: Yasuhiro Matsuda (was: Joel Koshy)
Status: Patch Available (was: Open
[
https://issues.apache.org/jira/browse/KAFKA-2112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14487830#comment-14487830
]
Yasuhiro Matsuda commented on KAFKA-2112:
-
Created reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2112:
Attachment: KAFKA-2112.patch
> make overflowWheel volat
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14561470#comment-14561470
]
Yasuhiro Matsuda commented on KAFKA-1989:
-
Thanks. I am looking into it. W
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2226:
Attachment: KAFKA-2226.patch
> NullPointerException in TestPurgatoryPerforma
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14561724#comment-14561724
]
Yasuhiro Matsuda commented on KAFKA-2226:
-
Created reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2226:
Status: Patch Available (was: Open)
> NullPointerException in TestPurgatoryPerforma
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2226:
Attachment: KAFKA-2226_2015-05-28_17:18:55.patch
> NullPointerException
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563965#comment-14563965
]
Yasuhiro Matsuda commented on KAFKA-2226:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2226:
Attachment: KAFKA-2226_2015-05-29_10:49:34.patch
> NullPointerException
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565133#comment-14565133
]
Yasuhiro Matsuda commented on KAFKA-2226:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565522#comment-14565522
]
Yasuhiro Matsuda commented on KAFKA-2226:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2226:
Attachment: KAFKA-2226_2015-05-29_15:04:35.patch
> NullPointerException
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565538#comment-14565538
]
Yasuhiro Matsuda commented on KAFKA-2226:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-2226:
Attachment: KAFKA-2226_2015-05-29_15:10:24.patch
> NullPointerException
Yasuhiro Matsuda created KAFKA-1965:
---
Summary: Leaner DelayedItem
Key: KAFKA-1965
URL: https://issues.apache.org/jira/browse/KAFKA-1965
Project: Kafka
Issue Type: Improvement
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Description: In DelayedItem, which is a superclass of DelayedOperation,
both the creation
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Status: Patch Available (was: Open)
> Leaner DelayedI
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Status: Patch Available (was: Open)
> Leaner DelayedI
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Attachment: KAFKA-1965.patch
> Leaner DelayedI
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Description: In DelayedItem, which is a superclass of DelayedOperation,
both the creation
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Status: Open (was: Patch Available)
> Leaner DelayedI
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Attachment: KAFKA-1965.patch
> Leaner DelayedI
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14327795#comment-14327795
]
Yasuhiro Matsuda commented on KAFKA-1965:
-
Created reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329725#comment-14329725
]
Yasuhiro Matsuda commented on KAFKA-1965:
-
Updated reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1965:
Attachment: KAFKA-1965_2015-02-20_15:08:26.patch
> Leaner DelayedI
Yasuhiro Matsuda created KAFKA-1989:
---
Summary: New purgatory design
Key: KAFKA-1989
URL: https://issues.apache.org/jira/browse/KAFKA-1989
Project: Kafka
Issue Type: Improvement
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341070#comment-14341070
]
Yasuhiro Matsuda commented on KAFKA-1989:
-
Created reviewboard h
[
https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yasuhiro Matsuda updated KAFKA-1989:
Status: Patch Available (was: Open)
> New purgatory des
1 - 100 of 168 matches
Mail list logo