No worries Jose ;-) So there are a few ways you could do this, but I think it’s important that you manage a single “stock level” state store, backed by a changelog. Use this for validation, and keep it up to date at the same time. You should also ensure the input topic(s) are partitioned by productId so any update to, or validation of, the same product will be sequenced. This effectively ensures the mutations of the quantities in stock will be atomic.
So say we have two inputs: OrderRequests, StockUpdates Order requests need to validate that there is sufficient stock, via the product store, then decrement the stock value in that store: public Event validateInventory(OrderRequestEvent order, KeyValueStore<> store){ Long stockCount = store.get(order.product); if (stockCount - order.quantity >= 0) { //decrement the value in the store store.put(order.product, stockCount - order.amount); return new OrderValidatedEvent(Validation.Passed); } else return new OrderValidatedEvent(Validation.Failed); } Stock updates need to increase the stock value in the product store as new stock arrives. public void updateStockStore(StockUpdateEvent update, KeyValueStore<> store){ Long current = update.get(update.product); store.put(update.product, current + update.amount); } To do the processing we merge input streams, then push this into a transfomer, that uses a single state store to manage the mapping between products and their stock levels. KStream<byte[], String> unvalidatedOrdersStream = builder.stream(orderTopic); KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic); StateStoreSupplier productStore = Stores.create(productStoreName)...build() KStream<byte[], String> orderOutputs = unvalidatedOrdersStream.outerJoin(stockStream, ...) .transform(StockCheckTransformer::new, productStoreName) .filter((key, value) -> value != ""); orderOutputs.to(validatedOrdersStream); With the transformer both managing and validating against the stock levels. StockCountTransformer { …. public KeyValue<byte[], Event> transform(ProductId key, Event event) if (event.isStockUpdate()) { Stock update = parseStock(value); return KeyValue.pair(key, updateStockStore(parseStockUpdate(event), productStore)) } else if (event.isOrderRequest()) { return KeyValue.pair(key, validateInventory(parseOrderReq(event), productStore)) } } } Now the stock levels will be held in the changelog topic which backs the ProductStore which we can reuse if we wish. I think we could also optimise this code a bit by splitting into two transformers via streams.branch(..). Regarding EoS. This doesn’t add any magic to your processing logic. It just guarantees that your stock count will be accurate in the face of failure (i.e. you don’t need to manage idempotence yourself). B On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo < joseantonio.in...@gmail.com> wrote: > Hi Garret, > > At the moment, to simplify the problem I only have one topic, orders, where > I add products and decrement them based on ProductAdded and ProductReserved > events. > > Yeaterday I was reading about EoS but I don't know if it'll solve the > problem. Dividing the query-update in two steps means that the event > ordering could be: > > OrderPlaced (query stock ok) > OrderPlaced (query stock ok) > ProductReserved (update stock) > ProductReserved (update stock) > > Regarding EoS this sequence is correct, the messages are delivered once in > the order in which they were generated. The problem is the order itself: if > there were a way to query-update-store-generate-event in one step to > produce instead the following sequence of events there wouldn't be any > problem: > > OrderPlaced->ProductReserved (query stock ok + Update stock store + > reserved event) > OrderPlaced->ProductNoStock (query stock fail so no update and out-of-stock > event) > > Regards > > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <garrett.bar...@gmail.com> > wrote: > > > Could you take in both topics via the same stream? Meaning don't do a > kafka > > streams join, literally just read both streams. If KStream cant do this, > > dunno haven't tried, then simple upstream merge job to throw them into 1 > > topic with same partitioning scheme. > > > > I'd assume you would have the products stream that would be some kind of > > incrementer on state (within the local state store). The Orders stream > > would act as a decrement to the same stream task. Exactly once semantics > > and you skirt the issue of having to wait for the update to come back > > around. > > > > Thoughts? > > > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo < > > joseantonio.in...@gmail.com> wrote: > > > > > Hi Chris, > > > > > > > > > > > > *"if I understand your problem correctly, the issue is that you need > > > todecrement the stock count when you reserve it, rather than splitting > > it* > > > *into a second phase."* > > > > > > That's exactly the problem, I would need to: > > > > > > 1) Read the OrderPlaced event from Kafka in ProductService... > > > 2) ...query the ProductsStock store to check availability... > > > 3) ...update the Product in the same phase (OrderPlacedEvent > processing) > > > 4) ...publish a ProductReserved message > > > > > > // 1) Read the OrderPlaced event... > > > @StreamListener(OrderProcessor.INPUT) > > > public void handleOrder(Map<String, Object> event){ > > > logger.info("Event {}", event); > > > if("OrderPlaced".equals(event.get("name"))){ > > > Order order = new Order(); > > > order.setId((String)event.get("orderId")); > > > order.setProductId((Integer)(event.get("productId"))); > > > order.setUid(event.get("uid").toString()); > > > ... > > > // 2) Query the ProductsStockStore... > > > Integer productStock = > > > getProductStock(order.getProductId()); > > > if(productStock != null && productStock > 0) { > > > // 3) Update the ProductsStockStore > > > ??? > > > > > > // 4) Publish a new message. No problem > here > > > > > > } > > > > > > @Override > > > public Integer getProductStock(Integer id) { > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams(); > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore = > > > streams.store("ProductsStock", QueryableStoreTypes.keyValueStore()); > > > return keyValueStore.get(id); > > > } > > > > > > However the only way I know of updating the store is publishing a new > > event > > > ProductReserved that will be processed by the KStream as a separated > step > > > (new Kafka message): > > > > > > Map<String, Object> event = new HashMap<>(); > > > event.put("name", "ProductReserved"); > > > event.put("orderId", order.getId()); > > > event.put("productId", order.getProductId()); > > > event.put("quantity", -1); > > > // 3) Update the ProductStore > > > orderProcessor.output().send(MessageBuilder.withPayload( > > > event).build(), > > > 500); > > > > > > This is the separated KStream config notice // 3) where the update > takes > > > place: > > > > > > @Configuration > > > public class KStreamsConfig { > > > > > > @Bean > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { > > > > > > Serde<Integer> integerSerde = Serdes.Integer(); > > > final Serializer<JsonNode> jsonSerializer = new > > > JsonSerializer(); > > > final Deserializer<JsonNode> jsonDeserializer = new > > > JsonDeserializer(); > > > final Serde<JsonNode> jsonSerde = > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer); > > > KStream<Integer, JsonNode> stream = > > kStreamBuilder.stream(integerSerde, > > > jsonSerde, "orders"); > > > > > > // 3) Update the ProductStore > > > stream.filter( (key, value) -> value != null && > > > value.get("name").asText().equals("ProductReserved")) > > > .map( (key, value) -> { > > > return new KeyValue<>(value.get("productId").asInt(), > > > value.get("quantity").asInt()); > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock"); > > > return stream; > > > } > > > } > > > > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java > > > <https://github.com/confluentinc/examples/blob/ > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/ > > > StateStoresInTheDSLIntegrationTest.java> > > > but > > > I still don't get how to integrate the update step in // 2). No idea > how > > I > > > can do all this in the same phase: > > > > > > - Consume a message > > > - Query a KStreams store > > > - Update the KStreams store > > > - Publish a ProductReserved message. > > > > > > Could you please outline the necessary code to do it? > > > > > > Thank you so much. > > > Jose > > > > > >