I think Joe left a clue in the idea of using Pulsar Functions. Sent from my iPhone
> On Nov 10, 2018, at 6:26 PM, Jia Zhai <zhaiji...@gmail.com> wrote: > > Thanks Joe for the comments. > To implementation this feature in client side is great, but from my view, > to achieve "Delayed Message", we may have to touch the dispatch path. > Is there any idea or suggestion of how to do it from client side? > > >> On Sat, Nov 10, 2018 at 3:41 PM wenfeng wang <sxian.w...@gmail.com> wrote: >> >> Joe +1 >> >> Joe F <joefranc...@gmail.com> 于2018年11月10日周六 下午12:47写道: >> >>> I am not a fan of adding complexity to the dispatch path, and I will >> always >>> have serious concerns about proposals that do so, including this one. >>> >>> In general, I would prefer Pulsar to keep the dispatch path simple and >>> efficient, and avoid server side implementations of business logic. >>> Streaming at scale, at low latency is what I think Pulsar should do. I >> am >>> biased here, because that is one of the reasons Pulsar got created >>> originally, at a time when there were many other message brokers out >> there >>> ( and many under the Apache umbrella too) >>> >>> All those other message brokers do all kinds of server-side logic - >>> filtering, transforming, scheduling, and so on. All of those systems have >>> more or less ended up with bottlenecks and complexity. And this is not >>> without reason. Message queues are queues, and most of the server side >>> logic implementations are attempts to make a queue into a database. A >>> system that is optimized for flow as a queue, will not be good as a >>> database, and vice-versa. >>> >>> I think the right way to do this kind of business logic is in the client >> or >>> leverage Pulsar functions, and the core broker dispatch path and process >>> space should just deal with performance and flow at scale >>> > >> Joe >>> >>> >>> >>> >>>> On Thu, Nov 8, 2018 at 1:39 PM 李鹏辉gmail <codelipeng...@gmail.com> wrote: >>>> >>>> Dear all >>>> >>>> This is a PIP to add feature of delayed message delivery. >>>> >>>> ## Motivation >>>> Scheduled and delayed message delivery is a very common feature to >>> support >>>> in a message system. Basically individual message can have a header >> which >>>> will be set by publisher and based on the header value the broker >> should >>>> hold on delivering messages until the configured delay or the scheduled >>>> time is met. >>>> >>>> ## Usage >>>> The delayed message delivery feature is enabled per message at producer >>>> side. >>>> >>>> Delayed messages publish example in client side: >>>> >>>> ```java >>>> // message to be delivered at the configured delay interval >>>> producer.newMessage().delayAt(3L, TimeUnit.Minute).value("Hello >>>> Pulsar!").send(); >>>> >>>> // message to be delivered at the configure time. >>>> producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00, 00)) >>>> ``` >>>> >>>> To enable or disable delay message feature: >>>> >>>> ```shell >>>> pulsar-admin namespaces >>>> >>>> enable-delayed-message Enable delayed message for all topics of >> the >>>> namespace >>>> Usage: enable-delayed-message [options] tenant/namespace >>>> >>>> Options: >>>> -p --time-partition-granularity >>>> Granularities of time will be partitioned, every time >>>> partition will be >>>> stored into legders and current time partition will be >>>> load in memory >>>> and organized in a TimeWheel.(eg: 30s, 5m, 1h, 3d, 2w) >>>> Default: 5m >>>> -t --tick-duration >>>> The duration between tick in TimeWheel. Calculate ticks >>>> per wheel >>>> using time-partition-granularity / tick-duration before >>>> load time >>>> partition into a TimeWheel.(eg: 500ms, 1s, 5m) >>>> Default: 1s >>>> >>>> disable-delayed-message Disable delayed message for all topics >>> of >>>> the namespace >>>> Usage: disable-delayed-message tenant/namespace >>>> ``` >>>> >>>> ## Design >>>> >>>> ### Delayed Message Index >>>> >>>> The “DelayedMessageIndex” will be implemented using a [TimeWheel >>> approach]( >>>> >> http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf >>> ). >>>> We will be maintaining a delayed index, indexing the delayed message by >>> its >>>> time and actual message id. >>>> >>>> The index is partitioned by the delayed time. Each time partition will >> be >>>> stored using one (or few) ledger(s). For example, if we are configuring >>>> the index to be partitioned by 5 minutes, we will store the index data >>> for >>>> every 5 minutes by its delayed time. The latest time partition will be >>>> loaded in memory and organized in a TimeWheel. >>>> >>>> The TimeWheel is indexed by ticks. For example, if we configured the >> tick >>>> to be 1 second, we will be maintaining 300 ticks for 5 minutes’ index. >> A >>>> timer task is scheduled every tick, and it will pick the indexed >> message >>>> from the TimeWheel and dispatch them to the real consumers. >>>> >>>> After completing dispatching the messages in current TimeWheel, it will >>>> load the TimeWheel from the next time partition. >>>> >>>> Delayed message option ` time-partition-granularity ` and >> `tick-duration` >>>> properly be reset to adapt delay message throughput change. ` >>>> time-partition-granularity ` can't be shrink. For example, exist >> config >>> is >>>> time-partition-granularity = 5m and tick-duration = 1s, delay message >>> index >>>> will store in 300 slot, If increase the time-partition-granularity to >>> 10m, >>>> when load next time partition TimeWheel will init with 600 slot, >>> Timewheel >>>> has enough slot to maintain already exist time partition(5m), but if >>>> decrease the time-partition-granularity to 2m, Timewheel can't load >>> already >>>> exist time partition(5m) into 120 slot. So regardless the >>>> time-partition-granularity shrink first, It's can be improve by split >>> time >>>> partition when load time partition. For time-partition-granularity, >>>> increase or decrease just affect precision of delay time. >>>> >>>> Delay message feature conflict with TTL and Backlog >>>> Quota(consumer_backlog_eviction). So it's necessary write a doc to >>> explain >>>> the conflict result. Next, if support ledger compact feature, it's can >> be >>>> improve by this feature, relocate un-ack messages to a new ledger, old >>>> ledger can be safe delete. >>>> >>>> ### Delayed Message Index Cursor >>>> >>>> We will maintain a `cursor` for ensure the messages will be indexed >>>> correctly in the `DelayMessageIndex`, so we don’t miss dispatching any >>>> messages. >>>> >>>> ### Recovery >>>> >>>> Since we are organizing the delayed message index using ledgers, and >> have >>>> a cursor to ensure messages are correctly indexed. So when a broker >> fails >>>> due to any reason, and the topic is recovered, we can figure out the >>> latest >>>> time partition and load the index from the ledgers. >>>> >>>> ## Changes >>>> >>>> ### Protocol Changes >>>> >>>> In order to support `delayed` and `scheduled` messages, we need to add >>>> following fields in `MessageMetadata`. >>>> >>>> ```protobuf >>>> message MessageMetadata { >>>> >>>> // the message will be delayed at delivery by `delayed_ms` >>>> milliseconds. >>>> optional int64 delayed_ms = 18; >>>> >>>> } >>>> ``` >>>> >>>> ### Broker Changes >>>> >>>> The publish path will not be changed. All the messages will still be >>> first >>>> appended to the managed ledger as normal. As the delivery related >> message >>>> attributes will be ignored for `failover` and `exclusive` subscriptions >>> to >>>> ensure FIFO ordering. >>>> >>>> The change happens at `MultipleConsumers` dispatcher. At the >> dispatching >>>> time, the dispatcher will check the delivery attributes at the message >>>> header. If a message is qualified for delivery (exceeding the >> configured >>>> delayed/scheduled time), the message will be dispatched as normal; >>>> otherwise it will be added back to the “delayed message index”. Those >>>> delayed messages will be delayed at dispatching from the “delayed >> message >>>> index”. >>>> >>>> ### Client Changes >>>> >>>> #### TypedMessageBuilder >>>> >>>> ```java >>>> public interface TypedMessageBuilder<T> { >>>> >>>> /** >>>> * Build a message to be delivered at the configured delay >> interval : >>>> <tt>delayTime</tt>. >>>> * @return typed message builder. >>>> */ >>>> TypedMessageBuilder<T> delayedAt(long delayTime, TimeUnit >>>> timeUnit); >>>> >>>> /** >>>> * Build a message to be delivered at the configure time. >>>> * >>>> * @return typed message builder. >>>> */ >>>> TypedMessageBuilder<T> scheduledAt(Date date); >>>> } >>>> ``` >>>> >>>> #### Producer >>>> >>>> Delay message can not send in bulk, because define the delay property >> in >>>> MessageMetadata. >>>> >>>> #### Consumer >>>> >>>> Both the `delayed` and `scheduled` messages will be appended to the >>> topic. >>>> The `delayed` and `scheduled` message attributes are only applied to >>>> `shared` subscription. It means both exclusive and failover >> subscriptions >>>> will be ignoring these message delivery attributes to ensure FIFO >>> delivery. >>>> >>>> ## Test Plan >>>> >>>> Unit tests + Integration Tests >>>> >>>> >>>> >>>> >>> >> https://docs.google.com/document/d/1yZYPJR6ZO9mJdeMgzbi4MJlp_KmKnU2cjbJmmDYyDI0/edit?usp=sharing >>>> >>>> >>>> — >>>> Regards, >>>> Penghui Li >>> >>