Is it possible to make the error message give more an explanation?

-Jay

On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi,
>
> I am still new to Kafka Streams by myself, but from my understanding if
> you change the key, your partitioning changes, ie, is not valid anymore.
> Thus, the joins (which assumes co-located data) cannot be performed
> (this is the reason why sources get set to null). You can write to an
> intermediate topic via .through(...) to get a valid partitioning:
>
> KStream dataStream = builder.stream(...).map(...).through(...);
>
> Afterward, your join should work.
>
> -Matthias
>
>
> On 05/04/2016 04:43 PM, Gaspar Muñoz wrote:
> > Hi there,
> >
> > I am not able to perform a Left Join between a KStream and KTable in
> Kafka
> > Streams.
> >
> > Exception in thread "main"
> > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> topology
> > building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not
> > joinable
> > at
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383)
> >
> >
> > My code looks
> >
> > KStream<String, String> dataStream = builder
> >         .stream(stringDeserializer, stringDeserializer,
> > conf.getString(INPUT_TOPIC))
> >         .map(App::keyByAap)
> >
> > KTable<String, String> aapTable = builder.table("lookup_topic");
> >
> > KStream<String, String> result = dataStream.leftJoin(aapTable, new
> > ValueJoinerAap());
> >
> >
> > My ValueJoiner simply concat two Strings.  I think the problem is in map
> > where I change the key in order to join by this field. In the internals
> of
> > KafkaStreams we can see  that map function set to null source nodes
> >
> > @Override
> > public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1,
> > V1>> mapper) {
> >     String name = topology.newName(MAP_NAME);
> >
> >     topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
> >
> >     return new KStreamImpl<>(topology, name, null);
> > }
> >
> >
> > And after that, in the left join precondition, in function Set<String>
> > allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code
> > checks if any source nodes is null and, obviously, the KStream which I
> > applied the map has the source to null so throw the exception.
> >
> > if (thisSourceNodes == null || otherSourceNodes == null)
> >     throw new TopologyBuilderException(this.name + " and " +
> > other.name + " are not joinable");
> >
> >
> > The question is, how can I change the key without lose parent data in
> order
> > to perform a join with KTable after that?
> >
> > Thanks.
> >
>
>

Reply via email to