Could you take in both topics via the same stream? Meaning don't do a kafka
streams join, literally just read both streams. If KStream cant do this,
dunno haven't tried, then simple upstream merge job to throw them into 1
topic with same partitioning scheme.

I'd assume you would have the products stream that would be some kind of
incrementer on state (within the local state store).  The Orders stream
would act as a decrement to the same stream task.  Exactly once semantics
and you skirt the issue of having to wait for the update to come back
around.

Thoughts?

On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
joseantonio.in...@gmail.com> wrote:

> Hi Chris,
>
>
>
> *"if I understand your problem correctly, the issue is that you need
> todecrement the stock count when you reserve it, rather than splitting it*
> *into a second phase."*
>
> That's exactly the problem, I would need to:
>
> 1) Read the OrderPlaced event from Kafka in ProductService...
> 2) ...query the ProductsStock store to check availability...
> 3) ...update the Product in the same phase (OrderPlacedEvent processing)
> 4) ...publish a ProductReserved message
>
>         // 1) Read the OrderPlaced event...
> @StreamListener(OrderProcessor.INPUT)
> public void handleOrder(Map<String, Object> event){
> logger.info("Event {}", event);
> if("OrderPlaced".equals(event.get("name"))){
> Order order = new Order();
> order.setId((String)event.get("orderId"));
> order.setProductId((Integer)(event.get("productId")));
> order.setUid(event.get("uid").toString());
> ...
>                         // 2) Query the ProductsStockStore...
>                         Integer productStock =
> getProductStock(order.getProductId());
>         if(productStock != null && productStock > 0) {
>                             // 3) Update the ProductsStockStore
>     ???
>
>                             // 4) Publish a new message. No problem here
>
>         }
>
> @Override
> public Integer getProductStock(Integer id) {
> KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
>    streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
> return keyValueStore.get(id);
> }
>
> However the only way I know of updating the store is publishing a new event
> ProductReserved that will be processed by the KStream as a separated step
> (new Kafka message):
>
>     Map<String, Object> event = new HashMap<>();
>     event.put("name", "ProductReserved");
>     event.put("orderId", order.getId());
>     event.put("productId", order.getProductId());
>                             event.put("quantity", -1);
>                             // 3) Update the ProductStore
>     orderProcessor.output().send(MessageBuilder.withPayload(
> event).build(),
> 500);
>
> This is the separated KStream config notice // 3) where the update takes
> place:
>
> @Configuration
> public class KStreamsConfig {
>
> @Bean
> public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
>
>     Serde<Integer> integerSerde = Serdes.Integer();
>             final Serializer<JsonNode> jsonSerializer = new
> JsonSerializer();
>             final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
>             final Serde<JsonNode> jsonSerde =
> Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> jsonSerde, "orders");
>
>             // 3) Update the ProductStore
>             stream.filter( (key, value) -> value != null &&
> value.get("name").asText().equals("ProductReserved"))
> .map( (key, value) -> {
>     return new KeyValue<>(value.get("productId").asInt(),
> value.get("quantity").asInt());
> }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
>    return stream;
> }
> }
>
> I've had a look at the StateStoresInTheDSLIntegrationTest.java
> <https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/streams/
> StateStoresInTheDSLIntegrationTest.java>
> but
> I still don't get how to integrate the update step in // 2). No idea how I
> can do all this in the same phase:
>
> - Consume a message
> - Query a KStreams store
> - Update the KStreams store
> - Publish a ProductReserved message.
>
> Could you please outline the necessary code to do it?
>
> Thank you so much.
> Jose
>

Reply via email to