elenacliu commented on PR #297: URL: https://github.com/apache/rocketmq-streams/pull/297#issuecomment-1626406733
在代码中,我构建了两个 stream,分别处理不同的任务。 ### 任务1:每隔 10s 输出过去 30s 的饮料和食物点单数 由于 RocketMQ 对 broker 写入的消息是字符串,所以应该首先将其转换为 Order 对象,这里用到了 json 的解析函数。 由于需要对饮料和食物点单分别记数,所以应该使用 keyby 算子进行分类,按照 Order 类型的 type 域进行分类。 其次考虑【每隔 10s 输出过去 30s】应该选择什么 window 类型。 在 RocketMQ Streams 框架中,window 分为 3 种类型:tumbling, sliding, session。每种 window 有不同的滑动规则。 (以下图源微软:https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions) **Tumbling window:** Tumbling window 最容易理解,窗口的大小和步长一致。 <img src="https://learn.microsoft.com/en-us/azure/stream-analytics/media/stream-analytics-window-functions/stream-analytics-window-functions-tumbling-intro.png" alt="Stream Analytics tumbling window" style="zoom:50%;" /> **Sliding window:(在 azure stream 中叫做 hopping window)** 可以看到 sliding window 需要给定两个参数,一个是窗口的大小,一个是窗口滑动的步长。这正好符合本任务的要求:每 10s(步长)汇报过去 30s(窗口大小)的指标。 <img src="https://learn.microsoft.com/en-us/azure/stream-analytics/media/stream-analytics-window-functions/stream-analytics-window-functions-hopping-intro.png" alt="Stream Analytics hopping window" style="zoom:50%;" /> **Session window:** <img src="https://learn.microsoft.com/en-us/azure/stream-analytics/media/stream-analytics-window-functions/stream-analytics-window-functions-session-intro.png" alt="Stream Analytics session window" style="zoom:48%;" /> > A session window begins when the first event occurs. If another event occurs within the specified timeout from the last ingested event, then the window extends to include the new event. Otherwise if no events occur within the timeout, then the window is closed at the timeout. session window 在第一个事件发生时就开始了,如果另一个事件在 timeout 时限内发生,则 window 会包括该事件,如果在 timeout 实现内没有新的事件发生,则窗口关闭。 最后因为要统计订单的总数,所以使用了一个 count 算子进行统计。 前面使用了 keyBy 算子,其返回值是 `GroupedStream`,并不支持打印。因此首先调用 `.toRStream` 进行转换,再调用 `.print` 打印内容,方便测试。 ### 任务2:每隔 100s 输出过去 100s 每个人花在购买 food/drink 上的钱的总量 首先,因为涉及到每个人(customer)和不同的点单类型(food/drink),因此 keyBy 需要支持多个 key 的操作。 ```java builder.source("order", source -> { Order order = JSON.parseObject(source, Order.class); System.out.println(order.toString()); return new Pair<>(null, order); }) .keyBy(new SelectAction<String, Order>() { @Override public String select(Order order) { return order.getCustomer() + "@" + order.getType(); } }) .window(WindowBuilder.tumblingWindow(Time.seconds(100))) .sum(Order::getPrice) .toRStream() .print(); ``` 为了支持多个 key,需要实现 `SelectionAction` 的 interface,将多个 key 拼接起来。 另外,因为是每隔 100s 就输出过去 100s 的总量,因此使用最基本的 tumbling window 即可。 为了计算钱的总量,所以使用了 sum 算子计算总和。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
