Hi everybody,

I have been struggling with this problem for quite a while now, resorting
to stackoverflow
<https://stackoverflow.com/questions/45144429/event-sourcing-apache-kafka-kafka-streams-how-to-assure-atomicity-transa>
for some help with no success. I am hoping to that here I'll get a more
authoritative answer from experienced Kafka users.

This is the summary of my problem:

- I am developing an application based on Spring Boot Microservices for a
shopping domain.
- I want to use Event Sourcing, having Kafka to register the events and
Kafka Streams API stores to materialize the views.
- To simplify the scenario we'll consider only two domains: Orders and
Products.
- The conflicting part:
   1) OrderService publishes an OrderPlaced event indicating a productId
and the quantity.
   2) ProductService consumes the event and queries (with an interactive
query) its Kafka Streams Store (ProductsStore) to check the availability of
the product. If there is availabilty it publishes a ProductReserved event
with productId and quantity:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic

orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
500);
    }else{
        //XXX CANCEL ORDER
    }
}

   Then ProductService consumes its own event in a Kafka Streams processor
to update the stock of the product in the ProductsStore.

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
jsonSerde, "orders");
stream.filter(...).groupByKey().reduce((...) -> {...}, "ProductsStock");

   3.1) Suppose that in 1) two orders were placed simultaneously for the
same product and there is only stock for one of them
   3.2) ProductService would process the first one, the stock is ok and
would publish the ProductReserved event.
   3.3) We can't assure that ProductService will always process in the
Kafka Streams processor the order1 ProductReserved event to update
ProductsStore before the order2 OrderCreated is processed. Then in cases
ProductService will generate a ProductReserved for order2 incorrectly,
generating an inconsistency.

IMPORTANT: You can find the detailed description, with code and the events
that are published and consumed in the previously referenced stackoverflow
question.

After so much thinking and looking up online I haven't found a single place
where I can get a clear way to deal with Event Sourcing with Kafka+Kafka
Streams solving the problem of atomicity.

I'd really appreciate if someone could propose a solution for this.

Regards
Jose

Reply via email to