I didn't say anything... my producer wasn't working properly, messages are
arriving now.

Finally I got it working :-) Thanks so much!!

2017-07-25 12:23 GMT+02:00 José Antonio Iñigo <joseantonio.in...@gmail.com>:

> Hi Ben,
>
> now I can see what you meant previously about using a Transformer. I was
> following a wrong approach dividing the processing between a Listener and a
> Stream processor.
>
> There's only one thing left that I don't know how to work out, this a
> draft of my code based on yours:
>
> @Bean
> @SuppressWarnings("unchecked")
> public KStream<?, ?> kStream2(KStreamBuilder builder,
> KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
> final Serde<Integer> integerSerde = Serdes.Integer();
> final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
>         final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
>         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
> jsonDeserializer);
> KStream<String, JsonNode> unvalidatedOrdersStream =
> builder.stream(ORDERS_TOPIC);
> KStream<String, JsonNode> stockStream = builder.stream(PRODUCTS_TOPIC);
>
> StateStoreSupplier<StateStore> productStore = Stores.create(PRODUCTS_STORE)
> .withKeys(integerSerde)
> .withValues(jsonSerde)
> .persistent()
> .build();
> builder.addStateStore(productStore);
> ValueJoiner<JsonNode, JsonNode, Map<String, String>> valueJoiner =
> (JsonNode value1, JsonNode value2) -> new HashMap<>();
> stockStream.branch(predicates)
> KStream<String, Map<String, String>> orderOutputs =
> unvalidatedOrdersStream.<JsonNode, Map<String,
> String>>outerJoin(stockStream, valueJoiner,  JoinWindows.of(1000));
> orderOutputs.<String, Map<String, String>>transform(() -> new
> StockCountTransformer(), PRODUCTS_STORE)
> .filter((key, value) -> {
> return value != null;
> }).to(ORDERS_TOPIC);
>
> return orderOutputs;
> }
>
> There are two ways of updating the product store:
> - ProductService has a REST endpoint that publishes ProductAdded events to
> product topic
> - OrderService sends a OrderPlaced event to the orders topic.
>
> The problem now is that, if I understand it right, in order to update the
> PRODUCTS_STORE there must be a join of an OrderPlaced event and a
> ProductAdded event *in a certain join window*. If there aren't Order and
> Product events that happen within a time window nothing will be updated in
> the store. What's more, ProductService shoud be able to update its store
> without having anything to do with the orders, shouldn't it? I have tried
> publishing ProductAdded events and nothing happens. Could you give me a
> hint about how to deal with this?
>
> Thanks again for your time!!
>
> 2017-07-24 15:23 GMT+02:00 Ben Stopford <b...@confluent.io>:
>
>> No worries Jose ;-)
>>
>> So there are a few ways you could do this, but I think it’s important that
>> you manage a single “stock level” state store, backed by a changelog. Use
>> this for validation, and keep it up to date at the same time. You should
>> also ensure the input topic(s) are partitioned by productId so any update
>> to, or validation of, the same product will be sequenced. This effectively
>> ensures the mutations of the quantities in stock will be atomic.
>>
>> So say we have two inputs: OrderRequests, StockUpdates
>>
>> Order requests need to validate that there is sufficient stock, via the
>> product store, then decrement the stock value in that store:
>>
>> public Event validateInventory(OrderRequestEvent order, KeyValueStore<>
>> store){
>>
>> Long stockCount = store.get(order.product);
>>
>> if (stockCount - order.quantity >= 0) {
>>
>> //decrement the value in the store
>>
>> store.put(order.product, stockCount - order.amount);
>>
>> return new OrderValidatedEvent(Validation.Passed);
>>
>> } else
>>
>>            return new OrderValidatedEvent(Validation.Failed);
>>
>> }
>>
>> Stock updates need to increase the stock value in the product store as new
>> stock arrives.
>>
>> public void updateStockStore(StockUpdateEvent update, KeyValueStore<>
>> store){
>>
>> Long current = update.get(update.product);
>>
>> store.put(update.product, current + update.amount);
>>
>> }
>>
>> To do the processing we merge input streams, then push this into a
>> transfomer, that uses a single state store to manage the mapping between
>> products and their stock levels.
>>
>> KStream<byte[], String> unvalidatedOrdersStream =
>> builder.stream(orderTopic);
>>
>> KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic);
>>
>> StateStoreSupplier productStore = Stores.create(productStoreName
>> )...build()
>>
>> KStream<byte[], String> orderOutputs =
>>
>> unvalidatedOrdersStream.outerJoin(stockStream, ...)
>>
>> .transform(StockCheckTransformer::new, productStoreName)
>>
>> .filter((key, value) -> value != "");
>>
>> orderOutputs.to(validatedOrdersStream);
>>
>>
>> With the transformer both managing and validating against the stock
>> levels.
>>
>> StockCountTransformer { ….
>>
>> public KeyValue<byte[], Event> transform(ProductId key, Event event)
>>
>> if (event.isStockUpdate()) {
>>
>>                 Stock update = parseStock(value);
>>
>>                 return KeyValue.pair(key,
>>
>> updateStockStore(parseStockUpdate(event), productStore))
>>
>>   } else if (event.isOrderRequest()) {
>>
>>                 return KeyValue.pair(key,
>>
>> validateInventory(parseOrderReq(event), productStore))
>>
>>             }
>>
>> }
>>
>> }
>>
>> Now the stock levels will be held in the changelog topic which backs the
>> ProductStore which we can reuse if we wish.
>>
>> I think we could also optimise this code a bit by splitting into two
>> transformers via streams.branch(..).
>>
>> Regarding EoS. This doesn’t add any magic to your processing logic. It
>> just
>> guarantees that your stock count will be accurate in the face of failure
>> (i.e. you don’t need to manage idempotence yourself).
>>
>> B
>>
>>
>> On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo <
>> joseantonio.in...@gmail.com> wrote:
>>
>> > Hi Garret,
>> >
>> > At the moment, to simplify the problem I only have one topic, orders,
>> where
>> > I add products and decrement them based on ProductAdded and
>> ProductReserved
>> > events.
>> >
>> > Yeaterday I was reading about EoS but I don't know if it'll solve the
>> > problem. Dividing the query-update in two steps means that the event
>> > ordering could be:
>> >
>> > OrderPlaced (query stock ok)
>> > OrderPlaced (query stock ok)
>> > ProductReserved (update stock)
>> > ProductReserved (update stock)
>> >
>> > Regarding EoS this sequence is correct, the messages are delivered once
>> in
>> > the order in which they were generated. The problem is the order
>> itself: if
>> > there were a way to query-update-store-generate-event in one step to
>> > produce instead the following sequence of events there wouldn't be any
>> > problem:
>> >
>> > OrderPlaced->ProductReserved (query stock ok + Update stock store +
>> > reserved event)
>> > OrderPlaced->ProductNoStock (query stock fail so no update and
>> out-of-stock
>> > event)
>> >
>> > Regards
>> >
>> > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <garrett.bar...@gmail.com>
>> > wrote:
>> >
>> > > Could you take in both topics via the same stream? Meaning don't do a
>> > kafka
>> > > streams join, literally just read both streams. If KStream cant do
>> this,
>> > > dunno haven't tried, then simple upstream merge job to throw them
>> into 1
>> > > topic with same partitioning scheme.
>> > >
>> > > I'd assume you would have the products stream that would be some kind
>> of
>> > > incrementer on state (within the local state store).  The Orders
>> stream
>> > > would act as a decrement to the same stream task.  Exactly once
>> semantics
>> > > and you skirt the issue of having to wait for the update to come back
>> > > around.
>> > >
>> > > Thoughts?
>> > >
>> > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
>> > > joseantonio.in...@gmail.com> wrote:
>> > >
>> > > > Hi Chris,
>> > > >
>> > > >
>> > > >
>> > > > *"if I understand your problem correctly, the issue is that you need
>> > > > todecrement the stock count when you reserve it, rather than
>> splitting
>> > > it*
>> > > > *into a second phase."*
>> > > >
>> > > > That's exactly the problem, I would need to:
>> > > >
>> > > > 1) Read the OrderPlaced event from Kafka in ProductService...
>> > > > 2) ...query the ProductsStock store to check availability...
>> > > > 3) ...update the Product in the same phase (OrderPlacedEvent
>> > processing)
>> > > > 4) ...publish a ProductReserved message
>> > > >
>> > > >         // 1) Read the OrderPlaced event...
>> > > > @StreamListener(OrderProcessor.INPUT)
>> > > > public void handleOrder(Map<String, Object> event){
>> > > > logger.info("Event {}", event);
>> > > > if("OrderPlaced".equals(event.get("name"))){
>> > > > Order order = new Order();
>> > > > order.setId((String)event.get("orderId"));
>> > > > order.setProductId((Integer)(event.get("productId")));
>> > > > order.setUid(event.get("uid").toString());
>> > > > ...
>> > > >                         // 2) Query the ProductsStockStore...
>> > > >                         Integer productStock =
>> > > > getProductStock(order.getProductId());
>> > > >         if(productStock != null && productStock > 0) {
>> > > >                             // 3) Update the ProductsStockStore
>> > > >     ???
>> > > >
>> > > >                             // 4) Publish a new message. No problem
>> > here
>> > > >
>> > > >         }
>> > > >
>> > > > @Override
>> > > > public Integer getProductStock(Integer id) {
>> > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
>> > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
>> > > >    streams.store("ProductsStock", QueryableStoreTypes.keyValueSt
>> ore());
>> > > > return keyValueStore.get(id);
>> > > > }
>> > > >
>> > > > However the only way I know of updating the store is publishing a
>> new
>> > > event
>> > > > ProductReserved that will be processed by the KStream as a separated
>> > step
>> > > > (new Kafka message):
>> > > >
>> > > >     Map<String, Object> event = new HashMap<>();
>> > > >     event.put("name", "ProductReserved");
>> > > >     event.put("orderId", order.getId());
>> > > >     event.put("productId", order.getProductId());
>> > > >                             event.put("quantity", -1);
>> > > >                             // 3) Update the ProductStore
>> > > >     orderProcessor.output().send(MessageBuilder.withPayload(
>> > > > event).build(),
>> > > > 500);
>> > > >
>> > > > This is the separated KStream config notice // 3) where the update
>> > takes
>> > > > place:
>> > > >
>> > > > @Configuration
>> > > > public class KStreamsConfig {
>> > > >
>> > > > @Bean
>> > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
>> > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
>> > > >
>> > > >     Serde<Integer> integerSerde = Serdes.Integer();
>> > > >             final Serializer<JsonNode> jsonSerializer = new
>> > > > JsonSerializer();
>> > > >             final Deserializer<JsonNode> jsonDeserializer = new
>> > > > JsonDeserializer();
>> > > >             final Serde<JsonNode> jsonSerde =
>> > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>> > > >    KStream<Integer, JsonNode> stream =
>> > > kStreamBuilder.stream(integerSerde,
>> > > > jsonSerde, "orders");
>> > > >
>> > > >             // 3) Update the ProductStore
>> > > >             stream.filter( (key, value) -> value != null &&
>> > > > value.get("name").asText().equals("ProductReserved"))
>> > > > .map( (key, value) -> {
>> > > >     return new KeyValue<>(value.get("productId").asInt(),
>> > > > value.get("quantity").asInt());
>> > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
>> > > >    return stream;
>> > > > }
>> > > > }
>> > > >
>> > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java
>> > > > <https://github.com/confluentinc/examples/blob/
>> > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/
>> > > > StateStoresInTheDSLIntegrationTest.java>
>> > > > but
>> > > > I still don't get how to integrate the update step in // 2). No idea
>> > how
>> > > I
>> > > > can do all this in the same phase:
>> > > >
>> > > > - Consume a message
>> > > > - Query a KStreams store
>> > > > - Update the KStreams store
>> > > > - Publish a ProductReserved message.
>> > > >
>> > > > Could you please outline the necessary code to do it?
>> > > >
>> > > > Thank you so much.
>> > > > Jose
>> > > >
>> > >
>> >
>>
>
>

Reply via email to