+1

I had the same thought and put it on my personal agenda already.

-Matthias

On 05/04/2016 06:37 PM, Jay Kreps wrote:
> Is it possible to make the error message give more an explanation?
> 
> -Jay
> 
> On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi,
>>
>> I am still new to Kafka Streams by myself, but from my understanding if
>> you change the key, your partitioning changes, ie, is not valid anymore.
>> Thus, the joins (which assumes co-located data) cannot be performed
>> (this is the reason why sources get set to null). You can write to an
>> intermediate topic via .through(...) to get a valid partitioning:
>>
>> KStream dataStream = builder.stream(...).map(...).through(...);
>>
>> Afterward, your join should work.
>>
>> -Matthias
>>
>>
>> On 05/04/2016 04:43 PM, Gaspar Muñoz wrote:
>>> Hi there,
>>>
>>> I am not able to perform a Left Join between a KStream and KTable in
>> Kafka
>>> Streams.
>>>
>>> Exception in thread "main"
>>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
>> topology
>>> building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not
>>> joinable
>>> at
>>>
>> org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44)
>>> at
>>>
>> org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383)
>>>
>>>
>>> My code looks
>>>
>>> KStream<String, String> dataStream = builder
>>>         .stream(stringDeserializer, stringDeserializer,
>>> conf.getString(INPUT_TOPIC))
>>>         .map(App::keyByAap)
>>>
>>> KTable<String, String> aapTable = builder.table("lookup_topic");
>>>
>>> KStream<String, String> result = dataStream.leftJoin(aapTable, new
>>> ValueJoinerAap());
>>>
>>>
>>> My ValueJoiner simply concat two Strings.  I think the problem is in map
>>> where I change the key in order to join by this field. In the internals
>> of
>>> KafkaStreams we can see  that map function set to null source nodes
>>>
>>> @Override
>>> public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1,
>>> V1>> mapper) {
>>>     String name = topology.newName(MAP_NAME);
>>>
>>>     topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
>>>
>>>     return new KStreamImpl<>(topology, name, null);
>>> }
>>>
>>>
>>> And after that, in the left join precondition, in function Set<String>
>>> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code
>>> checks if any source nodes is null and, obviously, the KStream which I
>>> applied the map has the source to null so throw the exception.
>>>
>>> if (thisSourceNodes == null || otherSourceNodes == null)
>>>     throw new TopologyBuilderException(this.name + " and " +
>>> other.name + " are not joinable");
>>>
>>>
>>> The question is, how can I change the key without lose parent data in
>> order
>>> to perform a join with KTable after that?
>>>
>>> Thanks.
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to