Daniel, maybe I did not extract your test logic correctly form you code. It's a long piece... However, I do see two order-events:
>>> playOrderstream(clock, user, 2, "Japan"); >>> playOrderstream(clock, user, 6, "Peru"); Maybe your test logic is somehow not correct (just a wild guess). > However there is a single record in my source stream which means that even >> if the aggregate is updated 2+ times the output should still be the same. If there is only one input, the aggregate should only be called once. Only in case if failure, a record might be processed twice. For this case, you aggregate would most likely change (not sure what you custom PurchaseHistoryAggregator#add does). Another possibility might be, that if you run multiple test after each other, that Streams pick ups the old state from the test before and does not start with an empty state. You should make sure, that all local state and changelog topics are deleted (or your different application.id for each test run) For the joins: You can partly influence by "shifting" the timestamps of one stream into the past (so it's get processed later). But this is also no strong guarantee. We are aware of this difficulty and we will for sure improve this in the future to give stricter/better guarantees. -Matthias On 6/23/17 2:42 AM, Daniel Del Castillo Perez wrote: > Hi Matthias, > > Thanks for your reply. I have a few questions on your comments: > > 1) I understand the records are updated as Ktables work as changelogs. > However there is a single record in my source stream which means that even > if the aggregate is updated 2+ times the output should still be the same. > In my example a single source record <user, 2> should results <user, 2> > instead of <user, 6> no matter how many times the aggregate is updated or > how many times the cache is flushed out - maybe I still misunderstand how > these reduce operations work. By the way, the size of cache should be set > to the default value which should be 10 MB - more than what I need for > such small records. > > 2) Is there a way to control this behavior somehow? Let¹s say I want to > keep the order record in memory for some time before the left-join is > applied? I now this sounds more like micro-batching but I¹m just wondering > whether this something similar that can be done in KStreams. > > Thanks. > > Regards, > Daniel > -- > DdC > > > > > > On 6/22/17, 10:03 PM, "Matthias J. Sax" <matth...@confluent.io> wrote: > >> Hi, >> >> there are two things: >> >> 1) aggregation operator produce an output record each time the aggregate >> is is updates. Thus, you would get 6 record in you example. At the same >> time, we deduplicate consecutive outputs with an internal cache. And the >> cache is flushed non-mechanistically (either partly flushed on evict, or >> completely flushed on commit). >> >> see: >> http://docs.confluent.io/current/streams/developer-guide.html#memory-manag >> ement >> >> 2) For the join, the synchronization of both stream based in timestamps >> is best effort. Thus, when the order event arrived, is might be the >> case, that the corresponding click was not jet processed. Thus, you get >> a <key; value:null> results. Note, when the click is processes later, >> you will get the result you expect. >> >> see: >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Seman >> tics >> >> >> >> -Matthias >> >> >> On 6/22/17 10:26 AM, Daniel Del Castillo Perez wrote: >>> Hi all, >>> >>> I¹m playing with Kafka Streams 0.10.2.1 and I¹m having some issues here >>> which I hope you can help me to clarify/understand. >>> >>> In a hypothetical scenario, I have 2 source streams clicks and orders >>> which I¹m trying to join to match determine from which page the >>> purchase has been made. I also want to count the number of purchased >>> items per user. This is what my code looks like you can ignore >>> annotation and any other Spring-related code: >>> >>> >>> >>> @Getter >>> >>> @ToString >>> >>> public class Order { >>> >>> >>> private long timestamp; >>> >>> private String user; >>> >>> private String pos; >>> >>> private int totalItems; >>> >>> private Double grandTotal; >>> >>> private String country; >>> >>> Š >>> } >>> >>> >>> @Getter >>> >>> @ToString >>> >>> public class Click { >>> >>> >>> private long timestamp; >>> >>> private String system; >>> >>> private String user; >>> >>> private String page; >>> >>> private String action; >>> >>> Š >>> >>> } >>> >>> >>> @Getter >>> >>> @ToString >>> >>> public class Purchase { >>> >>> >>> private long timestamp; >>> >>> private String user; >>> >>> private String page; >>> >>> private String pos; >>> >>> private String country; >>> >>> Š >>> } >>> >>> >>> @Getter >>> >>> @ToString >>> >>> public class PurchaseHistory { >>> >>> >>> private String user; >>> >>> private int itemsBought; >>> >>> Š >>> } >>> >>> >>> @Component >>> >>> @Slf4j >>> >>> public class PurchaseStream implements StreamRunner { >>> >>> >>> private @Value("${spring.application.name}") String appName; >>> >>> private final KStreamBuilder kStreamBuilder; >>> >>> private KafkaStreams kafkaStreams; >>> >>> private ApplicationProperties properties; >>> >>> >>> @VisibleForTesting >>> >>> void setAppName(String appName) { >>> >>> this.appName = appName; >>> >>> } >>> >>> >>> private Properties buildProperties() { >>> >>> Properties props = new Properties(); >>> >>> props.put("group.id", "purchases-stream"); >>> >>> props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream"); >>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName); >>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> properties.getKafkaBroker()); >>> >>> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, >>> properties.getReplicationFactor()); >>> >>> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >>> properties.getTimestampExtractor()); >>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>> properties.getCommitInterval()); >>> >>> return props; >>> >>> } >>> >>> >>> public PurchaseStream(ApplicationProperties properties) { >>> >>> this.properties = properties; >>> >>> >>> SerdeFactory serdeFactory = new JsonSerdeFactory(); >>> >>> Serde<String> stringSerde = Serdes.String(); >>> >>> >>> kStreamBuilder = new KStreamBuilder(); >>> >>> >>> KStream<String, Click> clickKStream = kStreamBuilder >>> >>> .stream(stringSerde, serdeFactory.serdeFor(Click.class), >>> properties.getClickStreamTopic()) >>> >>> .filter((k, click) -> ³PAY".equals(click.getAction())) >>> >>> .map((k, click) -> new KeyValue<>(click.getUser(), click)); >>> >>> >>> KStream<String, Order> ordersKStream = >>> kStreamBuilder.stream(stringSerde, serdeFactory.serdeFor(Order.class), >>> >>> properties.getOrderStreamTopic()); >>> >>> >>> KStream<String, PurchasePattern> purchasesKStream = ordersKStream >>> >>> .map((k, order) -> new KeyValue<>(order.getUser(), >>> >>> Purchase >>> >>> .builder() >>> >>> .timestamp(order.getTimestamp()) >>> >>> .user(order.getUser()) >>> >>> .pos(order.getPos()) >>> >>> .country(order.getCountry()) >>> >>> .build())) >>> >>> .leftJoin(clickKStream, >>> >>> (purchase, click) -> Purchase >>> >>> .builder(purchase) >>> >>> .page(click == null ? "UNKNOWN" : click.getPage()) >>> >>> .build(), >>> >>> JoinWindows.of(properties.getPurchasesJoinWindow()).until( >>> >>> 2 * properties.getPurchasesJoinWindow() + 1), >>> >>> stringSerde, serdeFactory.serdeFor(Purchase.class), >>> serdeFactory.serdeFor(Click.class)); >>> >>> purchasesKStream.to(stringSerde, >>> serdeFactory.serdeFor(Purchase.class), >>> >>> properties.getPurchasesTopic()); >>> >>> >>> ordersKStream >>> >>> .map((k, order) -> new KeyValue<>(order.getUser(), >>> >>> >>> PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTota >>> lItems()).build())) >>> >>> .groupByKey(stringSerde, >>> serdeFactory.serdeFor(PurchaseHistory.class)) >>> >>> .aggregate(PurchaseHistoryAggregator::new, >>> >>> (k, purchaseHistory, purchaseHistoryAggregator) -> >>> purchaseHistoryAggregator.add(purchaseHistory), >>> >>> serdeFactory.serdeFor(PurchaseHistoryAggregator.class), >>> ³purchaseHistoryStore") >>> >>> .to(stringSerde, >>> serdeFactory.serdeFor(PurchaseHistoryAggregator.class), >>> properties.getPurchaseHistoryTopic()); >>> >>> } >>> >>> >>> protected KafkaStreams connect() { >>> >>> log.info("Creating PurchaseStreams"); >>> >>> StreamsConfig streamsConfig = new StreamsConfig(buildProperties()); >>> >>> return new KafkaStreams(builder(), streamsConfig); >>> >>> } >>> >>> >>> @Override >>> >>> public void run() { >>> >>> log.info("Starting PurchaseStreams"); >>> >>> kafkaStreams = connect(); >>> >>> kafkaStreams.start(); >>> >>> log.info("Now started PurchaseStreams"); >>> >>> } >>> >>> >>> @Override >>> >>> public void stop() { >>> >>> kafkaStreams.close(); >>> >>> kafkaStreams.cleanUp(); >>> >>> } >>> >>> Š >>> >>> } >>> >>> >>> This is my integration test: >>> >>> >>> >>> public class PurchaseStreamIntegrationTest { >>> >>> >>> private static final String CLICKS = "clicks"; >>> >>> private static final String ORDERS = "orders"; >>> >>> private static final String PURCHASES = "purchases"; >>> >>> private static final String HISTORY = ³history"; >>> >>> >>> public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new >>> EmbeddedKafkaCluster(1); >>> >>> private static final Properties PRODUCER_CONFIG = new Properties(); >>> >>> private static final Properties CONSUMER_CONFIG = new Properties(); >>> >>> >>> private PurchaseStream stream; >>> >>> >>> @BeforeClass >>> >>> public static void setup() { >>> >>> PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>> KAFKA_CLUSTER.bootstrapServers()); >>> >>> PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); >>> >>> PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); >>> >>> PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >>> StringSerializer.class); >>> >>> PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >>> StringSerializer.class); >>> >>> >>> CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >>> KAFKA_CLUSTER.bootstrapServers()); >>> >>> CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, >>> "results-consumer"); >>> >>> CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>> "earliest"); >>> >>> CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>> StringDeserializer.class); >>> >>> CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> StringDeserializer.class); >>> >>> } >>> >>> >>> @Before >>> >>> public void init() throws Exception { >>> >>> Properties topicProperties = new Properties(); >>> >>> topicProperties.put("message.timestamp.type", "CreateTime"); >>> >>> >>> KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties); >>> >>> KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties); >>> >>> KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties); >>> >>> >>> topicProperties.put("cleanup.policy", "compact"); >>> >>> KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties); >>> >>> >>> ApplicationProperties props = ApplicationProperties >>> >>> .builder() >>> >>> .kafkaBroker(KAFKA_CLUSTER.bootstrapServers()) >>> >>> .clickStreamTopic(CLICKS) >>> >>> .orderStreamTopic(ORDERS) >>> >>> .purchasesTopic(PURCHASES) >>> >>> .purchaseHistoryTopic(HISTORY) >>> >>> .replicationFactor(1) >>> >>> .timestampExtractor(FailOnInvalidTimestamp.class) >>> >>> .purchasesJoinWindow(1000L) >>> >>> .commitInterval(10L) >>> >>> .build(); >>> >>> stream = new PurchaseStream(props); >>> >>> stream.setAppName("appName"); >>> >>> } >>> >>> >>> @After >>> >>> public void cleanup() throws Exception { >>> >>> KAFKA_CLUSTER.deleteTopic(CLICKS); >>> >>> KAFKA_CLUSTER.deleteTopic(ORDERS); >>> >>> KAFKA_CLUSTER.deleteTopic(PURCHASES); >>> >>> KAFKA_CLUSTER.deleteTopic(HISTORY); >>> >>> } >>> >>> >>> private void checkResult(final String outputTopic, final List<String> >>> expectedResult) throws Exception { >>> >>> if (expectedResult != null) { >>> >>> final List<String> result = >>> IntegrationTestUtils.waitUntilMinValuesRecordsReceived(CONSUMER_CONFIG, >>> outputTopic, >>> >>> expectedResult.size(), 30 * 1000L); >>> >>> assertThat(result, is(expectedResult)); >>> >>> } >>> >>> } >>> >>> >>> private static String click(long timestamp, String user, String page, >>> String action) { >>> >>> return new StringBuilder("{") >>> >>> .append("\"timestamp\":\"" + timestamp + "\",") >>> >>> .append("\"system\":\²ABC\",") >>> >>> .append("\"user\":\"" + user + "\",") >>> >>> .append("\"page\":\"" + page + "\",") >>> >>> .append("\"action\":\"" + action + "\"") >>> >>> .append("}") >>> >>> .toString(); >>> >>> } >>> >>> >>> private static String order(long timestamp, String user, String pos, >>> int totalItems, String country) { >>> >>> return new StringBuilder("{") >>> >>> .append("\"timestamp\":\"" + timestamp + "\",") >>> >>> .append("\"user\":\"" + user + "\",") >>> >>> .append("\"pos\":\"" + pos + "\",") >>> >>> .append(³\²totalItems\":" + totalItems + ",") >>> >>> .append(³\"grandTotal\":50.55,") >>> >>> .append("\"country\":\"" + country + "\"") >>> >>> .append("}") >>> >>> .toString(); >>> >>> } >>> >>> >>> private static String purchase(long timestamp, String user, String >>> page, String pos, String country) { >>> >>> return new StringBuilder("{") >>> >>> .append("\"timestamp\":" + timestamp + ",") >>> >>> .append("\"user\":\"" + user + "\",") >>> >>> .append("\"page\":\"" + page + "\",") >>> >>> .append("\"pos\":\"" + pos + "\",") >>> >>> .append("\"country\":\"" + country + "\"") >>> >>> .append("}") >>> >>> .toString(); >>> >>> } >>> >>> >>> private String purchaseHistory(String user, int itemsBought) { >>> >>> return new StringBuilder("{") >>> >>> .append("\"user\":\"" + user + "\",") >>> >>> .append(³\"itemsBought\":" + itemsBought) >>> >>> .append("}") >>> >>> .toString(); >>> >>> } >>> >>> >>> private static String toString(long l) { >>> >>> return String.valueOf(l); >>> >>> } >>> >>> >>> private static long longValue(String s) { >>> >>> return Long.parseLong(s); >>> >>> } >>> >>> >>> private static long playClickstream(long clock, String user) throws >>> Exception { >>> >>> ImmutableList<KeyValue<String, String>> clicks = ImmutableList >>> >>> .<KeyValue<String, String>> builder() >>> >>> .add(new KeyValue<>(toString(++clock), click(0L, user, >>> "LANDING", "LOGIN"))) >>> >>> .add(new KeyValue<>(toString(++clock), click(1L, user, "CART", >>> ³CHECKOUT"))) >>> >>> .add(new KeyValue<>(toString(++clock), click(2L, user, >>> "SHOPPING", ³CONFIRM"))) >>> >>> .add(new KeyValue<>(toString(++clock), click(3L, user, >>> "SHOPPING", ³PAY"))) >>> >>> .add(new KeyValue<>(toString(++clock), click(4L, user, >>> "PROFILE", "LOGOUT"))) >>> >>> .build(); >>> >>> for (KeyValue<String, String> click : clicks) { >>> >>> >>> IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(CLICKS, >>> Collections.singleton(click), >>> >>> PRODUCER_CONFIG, longValue(click.key)); >>> >>> } >>> >>> return clock; >>> >>> } >>> >>> >>> private static long playOrderstream(long clock, String user, int >>> totalItems, String country) throws Exception { >>> >>> ImmutableList<KeyValue<String, String>> orders = ImmutableList >>> >>> .<KeyValue<String, String>> builder() >>> >>> .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1", >>> totalItems, country))) >>> >>> .build(); >>> >>> >>> for (KeyValue<String, String> order : orders) { >>> >>> >>> IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(ORDERS, >>> Collections.singleton(order), >>> >>> PRODUCER_CONFIG, longValue(order.key)); >>> >>> } >>> >>> return clock; >>> >>> } >>> >>> >>> @Test >>> >>> public void typical() throws Exception { >>> >>> stream.run(); >>> >>> >>> String user = "bob"; >>> >>> >>> long clock = System.currentTimeMillis(); >>> >>> playClickstream(clock, user); >>> >>> // Simulate delay >>> >>> clock += 500L; >>> >>> playOrderstream(clock, user, 2, "Japan"); >>> >>> >>> checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, >>> "SHOPPING", "POS1", "Japan"))); >>> >>> checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2))); >>> >>> >>> stream.stop(); >>> >>> while (!stream.hasFinished()) { >>> >>> Thread.sleep(1000); >>> >>> } >>> >>> } >>> >>> >>> @Test >>> >>> public void lateOrder() throws Exception { >>> >>> stream.run(); >>> >>> >>> String user = "rob"; >>> >>> >>> long clock = System.currentTimeMillis(); >>> >>> playClickstream(clock, user); >>> >>> // Simulate long delay: order will fall after the join window >>> >>> clock += 2000L; >>> >>> playOrderstream(clock, user, 6, "Peru"); >>> >>> >>> checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, >>> "UNKNOWN", "POS1", "Peru"))); >>> >>> checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6))); >>> >>> >>> stream.stop(); >>> >>> while (!stream.hasFinished()) { >>> >>> Thread.sleep(1000); >>> >>> } >>> >>> } >>> >>> >>> } >>> >>> Note that I¹m using event-time semantics and FailOnInvalidTimestamp >>> time extractor. >>> >>> My assertions in this test fail now and then occasionally the whole >>> test is green. Assertions fail with things like: >>> >>> java.lang.AssertionError: >>> Expected: is <[{"user":"bob²,²itemsBought":2}]> >>> but: was <[{"user":"bob²,"itemsBought":6}]> >>> Š >>> >>> This total varies among runs. How come if this an non-windowed >>> aggregation and there is only 1 order record? I also tried with a >>> UnlimitedWindow which start at timestamp 0, i.e. a window that spans >> >from epoch to infinite and the results are similar. >>> >>> Here¹s another example for the typical test: >>> >>> java.lang.AssertionError: >>> Expected: is >>> <[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"J >>> apan"}]> >>> but: was >>> <[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Ja >>> pan"}]> >>> Š >>> >>> Sometimes I get: >>> >>> java.lang.AssertionError: >>> Expected: is >>> <[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"J >>> apan"}]> >>> but: was >>> <[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Ja >>> pan²},{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country >>> ":"Japan"}]> >>> Š >>> >>> I¹ve been playing with different settings like the commit interval and >>> the left-join window size but I still get the same results. I¹ve also >>> varied the delay time between clicks and order events (see the addition >>> between the 2 calls) but this doesn¹t help either >>> >>> Please, can someone help me to understand what¹s going on here? >>> >>> Thanks. >>> >>> Regards, >>> Daniel >>> -- >>> DdC >>> >>> PD: My apologies for the long email >>> >> >
signature.asc
Description: OpenPGP digital signature