Hi Brian, Sorry for you headache. We are aware that current join semantics in Streams are not straight forward.
We did rework those already in trunk and this change will be included in next release 0.10.2. Please build from trunk and let us know if this resolves your issue. For details, see this wiki page explaining current and new join semantics: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics For more details see the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-77%3A+Improve+Kafka+Streams+Join+Semantics Long story short: leftJoin only triggers a join computation for the left input while records of the right input only update the (right) input KTable but do not compute a join result. Thus, if your right input data arrives before you left input data it works -- however, if you left input data arrives first, it will not enrich the stream but join with "null". -Matthias On 12/7/16 9:25 AM, Brian Krahmer wrote: > Hey guys, > > I'm having a hell of a time here. I've worked for days trying to get > this joining pipeline working. I thought I had it working last week, > but my jubilation was premature. The point was to take data in from > five different topics and merge them together to obtain one enriched > event (output to compacted topic). Can anybody spot what I'm doing > wrong? The ordering makes no difference. For example, I've switched > the locationInput and the vehicleReservedInput inputs in the leftJoin > calls below, and I get the same results. The location part of the > enrichment works while the vehicleReserved part does not. I can't even > think of how to restructure the topology without resorting to building > my own lower-level topology. > > thanks, > brian > > > KTable<String, VehicleFinderData> fleetInput = > builder.table(Serdes.String(), > vehicleFinderDataSerde, FLEET_TOPIC, > VEHICLE_ENRICHER_FLEET_STORE); > ... > fleetInput.print("fleetInput"); > locationInput.print("locationInput"); > vehicleReservedInput.print("vehicleReservedInput"); > vehicleReleasedInput.print("vehicleReleasedInput"); > vehicleUsageEndedInput.print("vehicleUsageEndedInput"); > > KTable<String, VehicleFinderData> mergeStepOne = > fleetInput.leftJoin(locationInput, VehicleFinderData::merge); > mergeStepOne.print("mergeStepOne"); > KTable<String, VehicleFinderData> mergeStepTwo = > mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge); > mergeStepTwo.print("mergeStepTwo"); > KTable<String, VehicleFinderData> mergeStepThree = > mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge); > mergeStepThree.print("mergeStepThree"); > KTable<String, VehicleFinderData> mergeStepFour = > mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge); > mergeStepFour.print("mergeStepFour"); > > ** Generate a location event ** > > [locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json > value}<-null) > Deserializing from topic VehicleEnricherFleetStore > Merge operation called > [mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json > value}<-null) > Merge operation called > [mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json > value}<-null) > Merge operation called > [mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json > value}<-null) > Merge operation called > [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json > value}<-null) > > ** New event correctly serialized ** > > ------------------------------------------------------- > > ** Generate a vehicleReserved event ** > > [vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped > json value}<-null) > [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null) > > ** NO EVENT ** > >
signature.asc
Description: OpenPGP digital signature