Hi, there are two things:
1) aggregation operator produce an output record each time the aggregate is is updates. Thus, you would get 6 record in you example. At the same time, we deduplicate consecutive outputs with an internal cache. And the cache is flushed non-mechanistically (either partly flushed on evict, or completely flushed on commit). see: http://docs.confluent.io/current/streams/developer-guide.html#memory-management 2) For the join, the synchronization of both stream based in timestamps is best effort. Thus, when the order event arrived, is might be the case, that the corresponding click was not jet processed. Thus, you get a <key; value:null> results. Note, when the click is processes later, you will get the result you expect. see: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics -Matthias On 6/22/17 10:26 AM, Daniel Del Castillo Perez wrote: > Hi all, > > I’m playing with Kafka Streams 0.10.2.1 and I’m having some issues here which > I hope you can help me to clarify/understand. > > In a hypothetical scenario, I have 2 source streams – clicks and orders – > which I’m trying to join to match determine from which page the purchase has > been made. I also want to count the number of purchased items per user. This > is what my code looks like – you can ignore annotation and any other > Spring-related code: > > > > @Getter > > @ToString > > public class Order { > > > private long timestamp; > > private String user; > > private String pos; > > private int totalItems; > > private Double grandTotal; > > private String country; > > … > } > > > @Getter > > @ToString > > public class Click { > > > private long timestamp; > > private String system; > > private String user; > > private String page; > > private String action; > > … > > } > > > @Getter > > @ToString > > public class Purchase { > > > private long timestamp; > > private String user; > > private String page; > > private String pos; > > private String country; > > … > } > > > @Getter > > @ToString > > public class PurchaseHistory { > > > private String user; > > private int itemsBought; > > … > } > > > @Component > > @Slf4j > > public class PurchaseStream implements StreamRunner { > > > private @Value("${spring.application.name}") String appName; > > private final KStreamBuilder kStreamBuilder; > > private KafkaStreams kafkaStreams; > > private ApplicationProperties properties; > > > @VisibleForTesting > > void setAppName(String appName) { > > this.appName = appName; > > } > > > private Properties buildProperties() { > > Properties props = new Properties(); > > props.put("group.id", "purchases-stream"); > > props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream"); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName); > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > properties.getKafkaBroker()); > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, > properties.getReplicationFactor()); > > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > properties.getTimestampExtractor()); > > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > properties.getCommitInterval()); > > return props; > > } > > > public PurchaseStream(ApplicationProperties properties) { > > this.properties = properties; > > > SerdeFactory serdeFactory = new JsonSerdeFactory(); > > Serde<String> stringSerde = Serdes.String(); > > > kStreamBuilder = new KStreamBuilder(); > > > KStream<String, Click> clickKStream = kStreamBuilder > > .stream(stringSerde, serdeFactory.serdeFor(Click.class), > properties.getClickStreamTopic()) > > .filter((k, click) -> “PAY".equals(click.getAction())) > > .map((k, click) -> new KeyValue<>(click.getUser(), click)); > > > KStream<String, Order> ordersKStream = kStreamBuilder.stream(stringSerde, > serdeFactory.serdeFor(Order.class), > > properties.getOrderStreamTopic()); > > > KStream<String, PurchasePattern> purchasesKStream = ordersKStream > > .map((k, order) -> new KeyValue<>(order.getUser(), > > Purchase > > .builder() > > .timestamp(order.getTimestamp()) > > .user(order.getUser()) > > .pos(order.getPos()) > > .country(order.getCountry()) > > .build())) > > .leftJoin(clickKStream, > > (purchase, click) -> Purchase > > .builder(purchase) > > .page(click == null ? "UNKNOWN" : click.getPage()) > > .build(), > > JoinWindows.of(properties.getPurchasesJoinWindow()).until( > > 2 * properties.getPurchasesJoinWindow() + 1), > > stringSerde, serdeFactory.serdeFor(Purchase.class), > serdeFactory.serdeFor(Click.class)); > > purchasesKStream.to(stringSerde, serdeFactory.serdeFor(Purchase.class), > > properties.getPurchasesTopic()); > > > ordersKStream > > .map((k, order) -> new KeyValue<>(order.getUser(), > > > PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTotalItems()).build())) > > .groupByKey(stringSerde, serdeFactory.serdeFor(PurchaseHistory.class)) > > .aggregate(PurchaseHistoryAggregator::new, > > (k, purchaseHistory, purchaseHistoryAggregator) -> > purchaseHistoryAggregator.add(purchaseHistory), > > serdeFactory.serdeFor(PurchaseHistoryAggregator.class), > “purchaseHistoryStore") > > .to(stringSerde, > serdeFactory.serdeFor(PurchaseHistoryAggregator.class), > properties.getPurchaseHistoryTopic()); > > } > > > protected KafkaStreams connect() { > > log.info("Creating PurchaseStreams"); > > StreamsConfig streamsConfig = new StreamsConfig(buildProperties()); > > return new KafkaStreams(builder(), streamsConfig); > > } > > > @Override > > public void run() { > > log.info("Starting PurchaseStreams"); > > kafkaStreams = connect(); > > kafkaStreams.start(); > > log.info("Now started PurchaseStreams"); > > } > > > @Override > > public void stop() { > > kafkaStreams.close(); > > kafkaStreams.cleanUp(); > > } > > … > > } > > > This is my integration test: > > > > public class PurchaseStreamIntegrationTest { > > > private static final String CLICKS = "clicks"; > > private static final String ORDERS = "orders"; > > private static final String PURCHASES = "purchases"; > > private static final String HISTORY = “history"; > > > public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new > EmbeddedKafkaCluster(1); > > private static final Properties PRODUCER_CONFIG = new Properties(); > > private static final Properties CONSUMER_CONFIG = new Properties(); > > > private PurchaseStream stream; > > > @BeforeClass > > public static void setup() { > > PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > KAFKA_CLUSTER.bootstrapServers()); > > PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); > > PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); > > PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > > PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > > > CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > KAFKA_CLUSTER.bootstrapServers()); > > CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "results-consumer"); > > CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > > CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > > } > > > @Before > > public void init() throws Exception { > > Properties topicProperties = new Properties(); > > topicProperties.put("message.timestamp.type", "CreateTime"); > > > KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties); > > KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties); > > KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties); > > > topicProperties.put("cleanup.policy", "compact"); > > KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties); > > > ApplicationProperties props = ApplicationProperties > > .builder() > > .kafkaBroker(KAFKA_CLUSTER.bootstrapServers()) > > .clickStreamTopic(CLICKS) > > .orderStreamTopic(ORDERS) > > .purchasesTopic(PURCHASES) > > .purchaseHistoryTopic(HISTORY) > > .replicationFactor(1) > > .timestampExtractor(FailOnInvalidTimestamp.class) > > .purchasesJoinWindow(1000L) > > .commitInterval(10L) > > .build(); > > stream = new PurchaseStream(props); > > stream.setAppName("appName"); > > } > > > @After > > public void cleanup() throws Exception { > > KAFKA_CLUSTER.deleteTopic(CLICKS); > > KAFKA_CLUSTER.deleteTopic(ORDERS); > > KAFKA_CLUSTER.deleteTopic(PURCHASES); > > KAFKA_CLUSTER.deleteTopic(HISTORY); > > } > > > private void checkResult(final String outputTopic, final List<String> > expectedResult) throws Exception { > > if (expectedResult != null) { > > final List<String> result = > IntegrationTestUtils.waitUntilMinValuesRecordsReceived(CONSUMER_CONFIG, > outputTopic, > > expectedResult.size(), 30 * 1000L); > > assertThat(result, is(expectedResult)); > > } > > } > > > private static String click(long timestamp, String user, String page, > String action) { > > return new StringBuilder("{") > > .append("\"timestamp\":\"" + timestamp + "\",") > > .append("\"system\":\”ABC\",") > > .append("\"user\":\"" + user + "\",") > > .append("\"page\":\"" + page + "\",") > > .append("\"action\":\"" + action + "\"") > > .append("}") > > .toString(); > > } > > > private static String order(long timestamp, String user, String pos, int > totalItems, String country) { > > return new StringBuilder("{") > > .append("\"timestamp\":\"" + timestamp + "\",") > > .append("\"user\":\"" + user + "\",") > > .append("\"pos\":\"" + pos + "\",") > > .append(“\”totalItems\":" + totalItems + ",") > > .append(“\"grandTotal\":50.55,") > > .append("\"country\":\"" + country + "\"") > > .append("}") > > .toString(); > > } > > > private static String purchase(long timestamp, String user, String page, > String pos, String country) { > > return new StringBuilder("{") > > .append("\"timestamp\":" + timestamp + ",") > > .append("\"user\":\"" + user + "\",") > > .append("\"page\":\"" + page + "\",") > > .append("\"pos\":\"" + pos + "\",") > > .append("\"country\":\"" + country + "\"") > > .append("}") > > .toString(); > > } > > > private String purchaseHistory(String user, int itemsBought) { > > return new StringBuilder("{") > > .append("\"user\":\"" + user + "\",") > > .append(“\"itemsBought\":" + itemsBought) > > .append("}") > > .toString(); > > } > > > private static String toString(long l) { > > return String.valueOf(l); > > } > > > private static long longValue(String s) { > > return Long.parseLong(s); > > } > > > private static long playClickstream(long clock, String user) throws > Exception { > > ImmutableList<KeyValue<String, String>> clicks = ImmutableList > > .<KeyValue<String, String>> builder() > > .add(new KeyValue<>(toString(++clock), click(0L, user, "LANDING", > "LOGIN"))) > > .add(new KeyValue<>(toString(++clock), click(1L, user, "CART", > “CHECKOUT"))) > > .add(new KeyValue<>(toString(++clock), click(2L, user, "SHOPPING", > “CONFIRM"))) > > .add(new KeyValue<>(toString(++clock), click(3L, user, "SHOPPING", > “PAY"))) > > .add(new KeyValue<>(toString(++clock), click(4L, user, "PROFILE", > "LOGOUT"))) > > .build(); > > for (KeyValue<String, String> click : clicks) { > > IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(CLICKS, > Collections.singleton(click), > > PRODUCER_CONFIG, longValue(click.key)); > > } > > return clock; > > } > > > private static long playOrderstream(long clock, String user, int > totalItems, String country) throws Exception { > > ImmutableList<KeyValue<String, String>> orders = ImmutableList > > .<KeyValue<String, String>> builder() > > .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1", > totalItems, country))) > > .build(); > > > for (KeyValue<String, String> order : orders) { > > IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(ORDERS, > Collections.singleton(order), > > PRODUCER_CONFIG, longValue(order.key)); > > } > > return clock; > > } > > > @Test > > public void typical() throws Exception { > > stream.run(); > > > String user = "bob"; > > > long clock = System.currentTimeMillis(); > > playClickstream(clock, user); > > // Simulate delay > > clock += 500L; > > playOrderstream(clock, user, 2, "Japan"); > > > checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, "SHOPPING", > "POS1", "Japan"))); > > checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2))); > > > stream.stop(); > > while (!stream.hasFinished()) { > > Thread.sleep(1000); > > } > > } > > > @Test > > public void lateOrder() throws Exception { > > stream.run(); > > > String user = "rob"; > > > long clock = System.currentTimeMillis(); > > playClickstream(clock, user); > > // Simulate long delay: order will fall after the join window > > clock += 2000L; > > playOrderstream(clock, user, 6, "Peru"); > > > checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, "UNKNOWN", > "POS1", "Peru"))); > > checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6))); > > > stream.stop(); > > while (!stream.hasFinished()) { > > Thread.sleep(1000); > > } > > } > > > } > > Note that I’m using event-time semantics and FailOnInvalidTimestamp time > extractor. > > My assertions in this test fail now and then – occasionally the whole test is > green. Assertions fail with things like: > > java.lang.AssertionError: > Expected: is <[{"user":"bob”,”itemsBought":2}]> > but: was <[{"user":"bob”,"itemsBought":6}]> > … > > This total varies among runs. How come if this an non-windowed aggregation > and there is only 1 order record? I also tried with a UnlimitedWindow which > start at timestamp 0, i.e. a window that spans from epoch to infinite and the > results are similar. > > Here’s another example for the typical test: > > java.lang.AssertionError: > Expected: is > <[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]> > but: was > <[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Japan"}]> > … > > Sometimes I get: > > java.lang.AssertionError: > Expected: is > <[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]> > but: was > <[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Japan”},{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]> > … > > I’ve been playing with different settings like the commit interval and the > left-join window size but I still get the same results. I’ve also varied the > delay time between clicks and order events (see the addition between the 2 > calls) but this doesn’t help either > > Please, can someone help me to understand what’s going on here? > > Thanks. > > Regards, > Daniel > -- > DdC > > PD: My apologies for the long email >
signature.asc
Description: OpenPGP digital signature