I didn't say anything... my producer wasn't working properly, messages are arriving now.
Finally I got it working :-) Thanks so much!! 2017-07-25 12:23 GMT+02:00 José Antonio Iñigo <joseantonio.in...@gmail.com>: > Hi Ben, > > now I can see what you meant previously about using a Transformer. I was > following a wrong approach dividing the processing between a Listener and a > Stream processor. > > There's only one thing left that I don't know how to work out, this a > draft of my code based on yours: > > @Bean > @SuppressWarnings("unchecked") > public KStream<?, ?> kStream2(KStreamBuilder builder, > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { > final Serde<Integer> integerSerde = Serdes.Integer(); > final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); > final Deserializer<JsonNode> jsonDeserializer = new > JsonDeserializer(); > final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, > jsonDeserializer); > KStream<String, JsonNode> unvalidatedOrdersStream = > builder.stream(ORDERS_TOPIC); > KStream<String, JsonNode> stockStream = builder.stream(PRODUCTS_TOPIC); > > StateStoreSupplier<StateStore> productStore = Stores.create(PRODUCTS_STORE) > .withKeys(integerSerde) > .withValues(jsonSerde) > .persistent() > .build(); > builder.addStateStore(productStore); > ValueJoiner<JsonNode, JsonNode, Map<String, String>> valueJoiner = > (JsonNode value1, JsonNode value2) -> new HashMap<>(); > stockStream.branch(predicates) > KStream<String, Map<String, String>> orderOutputs = > unvalidatedOrdersStream.<JsonNode, Map<String, > String>>outerJoin(stockStream, valueJoiner, JoinWindows.of(1000)); > orderOutputs.<String, Map<String, String>>transform(() -> new > StockCountTransformer(), PRODUCTS_STORE) > .filter((key, value) -> { > return value != null; > }).to(ORDERS_TOPIC); > > return orderOutputs; > } > > There are two ways of updating the product store: > - ProductService has a REST endpoint that publishes ProductAdded events to > product topic > - OrderService sends a OrderPlaced event to the orders topic. > > The problem now is that, if I understand it right, in order to update the > PRODUCTS_STORE there must be a join of an OrderPlaced event and a > ProductAdded event *in a certain join window*. If there aren't Order and > Product events that happen within a time window nothing will be updated in > the store. What's more, ProductService shoud be able to update its store > without having anything to do with the orders, shouldn't it? I have tried > publishing ProductAdded events and nothing happens. Could you give me a > hint about how to deal with this? > > Thanks again for your time!! > > 2017-07-24 15:23 GMT+02:00 Ben Stopford <b...@confluent.io>: > >> No worries Jose ;-) >> >> So there are a few ways you could do this, but I think it’s important that >> you manage a single “stock level” state store, backed by a changelog. Use >> this for validation, and keep it up to date at the same time. You should >> also ensure the input topic(s) are partitioned by productId so any update >> to, or validation of, the same product will be sequenced. This effectively >> ensures the mutations of the quantities in stock will be atomic. >> >> So say we have two inputs: OrderRequests, StockUpdates >> >> Order requests need to validate that there is sufficient stock, via the >> product store, then decrement the stock value in that store: >> >> public Event validateInventory(OrderRequestEvent order, KeyValueStore<> >> store){ >> >> Long stockCount = store.get(order.product); >> >> if (stockCount - order.quantity >= 0) { >> >> //decrement the value in the store >> >> store.put(order.product, stockCount - order.amount); >> >> return new OrderValidatedEvent(Validation.Passed); >> >> } else >> >> return new OrderValidatedEvent(Validation.Failed); >> >> } >> >> Stock updates need to increase the stock value in the product store as new >> stock arrives. >> >> public void updateStockStore(StockUpdateEvent update, KeyValueStore<> >> store){ >> >> Long current = update.get(update.product); >> >> store.put(update.product, current + update.amount); >> >> } >> >> To do the processing we merge input streams, then push this into a >> transfomer, that uses a single state store to manage the mapping between >> products and their stock levels. >> >> KStream<byte[], String> unvalidatedOrdersStream = >> builder.stream(orderTopic); >> >> KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic); >> >> StateStoreSupplier productStore = Stores.create(productStoreName >> )...build() >> >> KStream<byte[], String> orderOutputs = >> >> unvalidatedOrdersStream.outerJoin(stockStream, ...) >> >> .transform(StockCheckTransformer::new, productStoreName) >> >> .filter((key, value) -> value != ""); >> >> orderOutputs.to(validatedOrdersStream); >> >> >> With the transformer both managing and validating against the stock >> levels. >> >> StockCountTransformer { …. >> >> public KeyValue<byte[], Event> transform(ProductId key, Event event) >> >> if (event.isStockUpdate()) { >> >> Stock update = parseStock(value); >> >> return KeyValue.pair(key, >> >> updateStockStore(parseStockUpdate(event), productStore)) >> >> } else if (event.isOrderRequest()) { >> >> return KeyValue.pair(key, >> >> validateInventory(parseOrderReq(event), productStore)) >> >> } >> >> } >> >> } >> >> Now the stock levels will be held in the changelog topic which backs the >> ProductStore which we can reuse if we wish. >> >> I think we could also optimise this code a bit by splitting into two >> transformers via streams.branch(..). >> >> Regarding EoS. This doesn’t add any magic to your processing logic. It >> just >> guarantees that your stock count will be accurate in the face of failure >> (i.e. you don’t need to manage idempotence yourself). >> >> B >> >> >> On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo < >> joseantonio.in...@gmail.com> wrote: >> >> > Hi Garret, >> > >> > At the moment, to simplify the problem I only have one topic, orders, >> where >> > I add products and decrement them based on ProductAdded and >> ProductReserved >> > events. >> > >> > Yeaterday I was reading about EoS but I don't know if it'll solve the >> > problem. Dividing the query-update in two steps means that the event >> > ordering could be: >> > >> > OrderPlaced (query stock ok) >> > OrderPlaced (query stock ok) >> > ProductReserved (update stock) >> > ProductReserved (update stock) >> > >> > Regarding EoS this sequence is correct, the messages are delivered once >> in >> > the order in which they were generated. The problem is the order >> itself: if >> > there were a way to query-update-store-generate-event in one step to >> > produce instead the following sequence of events there wouldn't be any >> > problem: >> > >> > OrderPlaced->ProductReserved (query stock ok + Update stock store + >> > reserved event) >> > OrderPlaced->ProductNoStock (query stock fail so no update and >> out-of-stock >> > event) >> > >> > Regards >> > >> > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <garrett.bar...@gmail.com> >> > wrote: >> > >> > > Could you take in both topics via the same stream? Meaning don't do a >> > kafka >> > > streams join, literally just read both streams. If KStream cant do >> this, >> > > dunno haven't tried, then simple upstream merge job to throw them >> into 1 >> > > topic with same partitioning scheme. >> > > >> > > I'd assume you would have the products stream that would be some kind >> of >> > > incrementer on state (within the local state store). The Orders >> stream >> > > would act as a decrement to the same stream task. Exactly once >> semantics >> > > and you skirt the issue of having to wait for the update to come back >> > > around. >> > > >> > > Thoughts? >> > > >> > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo < >> > > joseantonio.in...@gmail.com> wrote: >> > > >> > > > Hi Chris, >> > > > >> > > > >> > > > >> > > > *"if I understand your problem correctly, the issue is that you need >> > > > todecrement the stock count when you reserve it, rather than >> splitting >> > > it* >> > > > *into a second phase."* >> > > > >> > > > That's exactly the problem, I would need to: >> > > > >> > > > 1) Read the OrderPlaced event from Kafka in ProductService... >> > > > 2) ...query the ProductsStock store to check availability... >> > > > 3) ...update the Product in the same phase (OrderPlacedEvent >> > processing) >> > > > 4) ...publish a ProductReserved message >> > > > >> > > > // 1) Read the OrderPlaced event... >> > > > @StreamListener(OrderProcessor.INPUT) >> > > > public void handleOrder(Map<String, Object> event){ >> > > > logger.info("Event {}", event); >> > > > if("OrderPlaced".equals(event.get("name"))){ >> > > > Order order = new Order(); >> > > > order.setId((String)event.get("orderId")); >> > > > order.setProductId((Integer)(event.get("productId"))); >> > > > order.setUid(event.get("uid").toString()); >> > > > ... >> > > > // 2) Query the ProductsStockStore... >> > > > Integer productStock = >> > > > getProductStock(order.getProductId()); >> > > > if(productStock != null && productStock > 0) { >> > > > // 3) Update the ProductsStockStore >> > > > ??? >> > > > >> > > > // 4) Publish a new message. No problem >> > here >> > > > >> > > > } >> > > > >> > > > @Override >> > > > public Integer getProductStock(Integer id) { >> > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams(); >> > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore = >> > > > streams.store("ProductsStock", QueryableStoreTypes.keyValueSt >> ore()); >> > > > return keyValueStore.get(id); >> > > > } >> > > > >> > > > However the only way I know of updating the store is publishing a >> new >> > > event >> > > > ProductReserved that will be processed by the KStream as a separated >> > step >> > > > (new Kafka message): >> > > > >> > > > Map<String, Object> event = new HashMap<>(); >> > > > event.put("name", "ProductReserved"); >> > > > event.put("orderId", order.getId()); >> > > > event.put("productId", order.getProductId()); >> > > > event.put("quantity", -1); >> > > > // 3) Update the ProductStore >> > > > orderProcessor.output().send(MessageBuilder.withPayload( >> > > > event).build(), >> > > > 500); >> > > > >> > > > This is the separated KStream config notice // 3) where the update >> > takes >> > > > place: >> > > > >> > > > @Configuration >> > > > public class KStreamsConfig { >> > > > >> > > > @Bean >> > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, >> > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { >> > > > >> > > > Serde<Integer> integerSerde = Serdes.Integer(); >> > > > final Serializer<JsonNode> jsonSerializer = new >> > > > JsonSerializer(); >> > > > final Deserializer<JsonNode> jsonDeserializer = new >> > > > JsonDeserializer(); >> > > > final Serde<JsonNode> jsonSerde = >> > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer); >> > > > KStream<Integer, JsonNode> stream = >> > > kStreamBuilder.stream(integerSerde, >> > > > jsonSerde, "orders"); >> > > > >> > > > // 3) Update the ProductStore >> > > > stream.filter( (key, value) -> value != null && >> > > > value.get("name").asText().equals("ProductReserved")) >> > > > .map( (key, value) -> { >> > > > return new KeyValue<>(value.get("productId").asInt(), >> > > > value.get("quantity").asInt()); >> > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock"); >> > > > return stream; >> > > > } >> > > > } >> > > > >> > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java >> > > > <https://github.com/confluentinc/examples/blob/ >> > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/ >> > > > StateStoresInTheDSLIntegrationTest.java> >> > > > but >> > > > I still don't get how to integrate the update step in // 2). No idea >> > how >> > > I >> > > > can do all this in the same phase: >> > > > >> > > > - Consume a message >> > > > - Query a KStreams store >> > > > - Update the KStreams store >> > > > - Publish a ProductReserved message. >> > > > >> > > > Could you please outline the necessary code to do it? >> > > > >> > > > Thank you so much. >> > > > Jose >> > > > >> > > >> > >> > >