I was looking for KTable-KTable semantics where both trigger updates.
The result will be used to enrich a few KStreams.
I'll keep an eye on this jira.
Meanwhile, I'll use custom processor or like you said convert it to
KStream-KTable join and continue with my test.
Srikanth
On Thu, Jul 14, 201
Sorry that I can only point to a Jira right now :/
About your CustomProcessor. I doubt that this will work:
CustomProcessor#init() will be executed after you call KafkaStream.start();
Thus, your table2.foreach will be executed after the topology was built
and I guess, will just not have any effe
Ah, I was hoping for a magic solution not a jira!
Another thought was to embed table2 as KTable in custom processor and use
that for lookup.
class CustomProcesser(kStreamBuilder: KStreamBuilder) extends
Processor[Int, Int] {
private var kvStore: KeyValueStore[Int, Int] = _
private val table2
My bad... I should have considered this in the first place. You are
absolutely right.
Supporting this kind of a join is work in progress.
https://issues.apache.org/jira/browse/KAFKA-3705
Your custom solution (Option 1) might work... But as you mentioned, the
problem will be that the first table g
I should have mentioned that I tried this. It worked in other case but will
not work for this one.
I'm pasting a sample from table1 that I gave in my first email.
Table1
111 -> aaa
222 -> bbb
333 -> aaa
Here value is not unique(aaa). So, I can't just make it a key. 333 will
then override 11
You will need to set a new key before you do the re-partitioning. In
your case, it seems you want to switch key and value. This can be done
with a simple map
> table1.toStream()
> .map(new KeyValueMapper() {
> public KeyValue apply(K key, V value) {
>return new K
Michael,
Thanks! Looking forward to the update.
An interface like KTable is very conducive for joins. Hopefully, it will
get more flexible.
Srikanth
On Thu, Jul 14, 2016 at 4:35 AM, Michael Noll wrote:
> Srikant,
>
> > Its not a case for join() as the keys don't match. Its more a lookup
> tabl
Matthias,
With option 2, how would we perform join after re-partition. Although we
re-partitioned with value, the key doesn't change.
KTable joins always use keys and ValueJoiner get values from both table
when keys match.
Having data co-located will not be sufficient rt??
Srikanth
On Thu, Jul
On 7/14/16, 04:35, "Michael Noll" wrote:
> Also, a heads up: It turned out that user questions around joins in Kafka
> Streams are pretty common. We are currently working on improving the
> documentation for joins to make this more clear.
Excellent!
Srikant,
> Its not a case for join() as the keys don't match. Its more a lookup
table.
Yes, the semantics of streaming joins in Kafka Streams are bit different
from joins in traditional RDBMS.
See
http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams.
Also, a heads up: It
I would recommend re-partitioning as described in Option 2.
-Matthias
On 07/13/2016 10:53 PM, Srikanth wrote:
> Hello,
>
> I'm trying the following join using KTable. There are two change log tables.
> Table1
> 111 -> aaa
> 222 -> bbb
> 333 -> aaa
>
> Table2
> aaa -> 999
> bbb -> 888
Hello,
I'm trying the following join using KTable. There are two change log tables.
Table1
111 -> aaa
222 -> bbb
333 -> aaa
Table2
aaa -> 999
bbb -> 888
ccc -> 777
My result table should be
111 -> 999
222 -> 888
333 -> 999
Its not a case for join() as the keys don't match. Its
12 matches
Mail list logo