Hi Garret, At the moment, to simplify the problem I only have one topic, orders, where I add products and decrement them based on ProductAdded and ProductReserved events.
Yeaterday I was reading about EoS but I don't know if it'll solve the problem. Dividing the query-update in two steps means that the event ordering could be: OrderPlaced (query stock ok) OrderPlaced (query stock ok) ProductReserved (update stock) ProductReserved (update stock) Regarding EoS this sequence is correct, the messages are delivered once in the order in which they were generated. The problem is the order itself: if there were a way to query-update-store-generate-event in one step to produce instead the following sequence of events there wouldn't be any problem: OrderPlaced->ProductReserved (query stock ok + Update stock store + reserved event) OrderPlaced->ProductNoStock (query stock fail so no update and out-of-stock event) Regards On Sat, 22 Jul 2017 at 05:35, Garrett Barton <garrett.bar...@gmail.com> wrote: > Could you take in both topics via the same stream? Meaning don't do a kafka > streams join, literally just read both streams. If KStream cant do this, > dunno haven't tried, then simple upstream merge job to throw them into 1 > topic with same partitioning scheme. > > I'd assume you would have the products stream that would be some kind of > incrementer on state (within the local state store). The Orders stream > would act as a decrement to the same stream task. Exactly once semantics > and you skirt the issue of having to wait for the update to come back > around. > > Thoughts? > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo < > joseantonio.in...@gmail.com> wrote: > > > Hi Chris, > > > > > > > > *"if I understand your problem correctly, the issue is that you need > > todecrement the stock count when you reserve it, rather than splitting > it* > > *into a second phase."* > > > > That's exactly the problem, I would need to: > > > > 1) Read the OrderPlaced event from Kafka in ProductService... > > 2) ...query the ProductsStock store to check availability... > > 3) ...update the Product in the same phase (OrderPlacedEvent processing) > > 4) ...publish a ProductReserved message > > > > // 1) Read the OrderPlaced event... > > @StreamListener(OrderProcessor.INPUT) > > public void handleOrder(Map<String, Object> event){ > > logger.info("Event {}", event); > > if("OrderPlaced".equals(event.get("name"))){ > > Order order = new Order(); > > order.setId((String)event.get("orderId")); > > order.setProductId((Integer)(event.get("productId"))); > > order.setUid(event.get("uid").toString()); > > ... > > // 2) Query the ProductsStockStore... > > Integer productStock = > > getProductStock(order.getProductId()); > > if(productStock != null && productStock > 0) { > > // 3) Update the ProductsStockStore > > ??? > > > > // 4) Publish a new message. No problem here > > > > } > > > > @Override > > public Integer getProductStock(Integer id) { > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams(); > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore = > > streams.store("ProductsStock", QueryableStoreTypes.keyValueStore()); > > return keyValueStore.get(id); > > } > > > > However the only way I know of updating the store is publishing a new > event > > ProductReserved that will be processed by the KStream as a separated step > > (new Kafka message): > > > > Map<String, Object> event = new HashMap<>(); > > event.put("name", "ProductReserved"); > > event.put("orderId", order.getId()); > > event.put("productId", order.getProductId()); > > event.put("quantity", -1); > > // 3) Update the ProductStore > > orderProcessor.output().send(MessageBuilder.withPayload( > > event).build(), > > 500); > > > > This is the separated KStream config notice // 3) where the update takes > > place: > > > > @Configuration > > public class KStreamsConfig { > > > > @Bean > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { > > > > Serde<Integer> integerSerde = Serdes.Integer(); > > final Serializer<JsonNode> jsonSerializer = new > > JsonSerializer(); > > final Deserializer<JsonNode> jsonDeserializer = new > > JsonDeserializer(); > > final Serde<JsonNode> jsonSerde = > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer); > > KStream<Integer, JsonNode> stream = > kStreamBuilder.stream(integerSerde, > > jsonSerde, "orders"); > > > > // 3) Update the ProductStore > > stream.filter( (key, value) -> value != null && > > value.get("name").asText().equals("ProductReserved")) > > .map( (key, value) -> { > > return new KeyValue<>(value.get("productId").asInt(), > > value.get("quantity").asInt()); > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock"); > > return stream; > > } > > } > > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java > > <https://github.com/confluentinc/examples/blob/ > > master/kafka-streams/src/test/java/io/confluent/examples/streams/ > > StateStoresInTheDSLIntegrationTest.java> > > but > > I still don't get how to integrate the update step in // 2). No idea how > I > > can do all this in the same phase: > > > > - Consume a message > > - Query a KStreams store > > - Update the KStreams store > > - Publish a ProductReserved message. > > > > Could you please outline the necessary code to do it? > > > > Thank you so much. > > Jose > > >