My previous mail was in fact addressed to Ben, not Chris, sorry for the mistake.
Regards On Sat, 22 Jul 2017 at 00:15, José Antonio Iñigo < joseantonio.in...@gmail.com> wrote: > Hi Chris, > > *"if I understand your problem correctly, the issue is that you need to* > > > *decrement the stock count when you reserve it, rather than splitting it* > *into a second phase."* > > That's exactly the problem, I would need to: > > 1) Read the OrderPlaced event from Kafka in ProductService... > 2) ...query the ProductsStock store to check availability... > 3) ...update the Product in the same phase (OrderPlacedEvent processing) > 4) ...publish a ProductReserved message > > // 1) Read the OrderPlaced event... > @StreamListener(OrderProcessor.INPUT) > public void handleOrder(Map<String, Object> event){ > logger.info("Event {}", event); > if("OrderPlaced".equals(event.get("name"))){ > Order order = new Order(); > order.setId((String)event.get("orderId")); > order.setProductId((Integer)(event.get("productId"))); > order.setUid(event.get("uid").toString()); > ... > // 2) Query the ProductsStockStore... > Integer productStock = > getProductStock(order.getProductId()); > if(productStock != null && productStock > 0) { > // 3) Update the ProductsStockStore > ??? > > // 4) Publish a new message. No problem here > > } > > @Override > public Integer getProductStock(Integer id) { > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams(); > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore = > streams.store("ProductsStock", QueryableStoreTypes.keyValueStore()); > return keyValueStore.get(id); > } > > However the only way I know of updating the store is publishing a new > event ProductReserved that will be processed by the KStream as a separated > step (new Kafka message): > > Map<String, Object> event = new HashMap<>(); > event.put("name", "ProductReserved"); > event.put("orderId", order.getId()); > event.put("productId", order.getProductId()); > event.put("quantity", -1); > // 3) Update the ProductStore > > orderProcessor.output().send(MessageBuilder.withPayload(event).build(), > 500); > > This is the separated KStream config notice // 3) where the update takes > place: > > @Configuration > public class KStreamsConfig { > > @Bean > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { > > Serde<Integer> integerSerde = Serdes.Integer(); > final Serializer<JsonNode> jsonSerializer = new > JsonSerializer(); > final Deserializer<JsonNode> jsonDeserializer = new > JsonDeserializer(); > final Serde<JsonNode> jsonSerde = > Serdes.serdeFrom(jsonSerializer, jsonDeserializer); > KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, > jsonSerde, "orders"); > > // 3) Update the ProductStore > stream.filter( (key, value) -> value != null && > value.get("name").asText().equals("ProductReserved")) > .map( (key, value) -> { > return new KeyValue<>(value.get("productId").asInt(), > value.get("quantity").asInt()); > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock"); > return stream; > } > } > > I've had a look at the StateStoresInTheDSLIntegrationTest.java > <https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java> > but > I still don't get how to integrate the update step in // 2). No idea how > I can do all this in the same phase: > > - Consume a message > - Query a KStreams store > - Update the KStreams store > - Publish a ProductReserved message. > > Could you please outline the necessary code to do it? > > Thank you so much. > Jose >