Thanks Joe for the comments. To implementation this feature in client side is great, but from my view, to achieve "Delayed Message", we may have to touch the dispatch path. Is there any idea or suggestion of how to do it from client side?
On Sat, Nov 10, 2018 at 3:41 PM wenfeng wang <sxian.w...@gmail.com> wrote: > Joe +1 > > Joe F <joefranc...@gmail.com> 于2018年11月10日周六 下午12:47写道: > > > I am not a fan of adding complexity to the dispatch path, and I will > always > > have serious concerns about proposals that do so, including this one. > > > > In general, I would prefer Pulsar to keep the dispatch path simple and > > efficient, and avoid server side implementations of business logic. > > Streaming at scale, at low latency is what I think Pulsar should do. I > am > > biased here, because that is one of the reasons Pulsar got created > > originally, at a time when there were many other message brokers out > there > > ( and many under the Apache umbrella too) > > > > All those other message brokers do all kinds of server-side logic - > > filtering, transforming, scheduling, and so on. All of those systems have > > more or less ended up with bottlenecks and complexity. And this is not > > without reason. Message queues are queues, and most of the server side > > logic implementations are attempts to make a queue into a database. A > > system that is optimized for flow as a queue, will not be good as a > > database, and vice-versa. > > > > I think the right way to do this kind of business logic is in the client > or > > leverage Pulsar functions, and the core broker dispatch path and process > > space should just deal with performance and flow at scale > > > Joe > > > > > > > > > > On Thu, Nov 8, 2018 at 1:39 PM 李鹏辉gmail <codelipeng...@gmail.com> wrote: > > > > > Dear all > > > > > > This is a PIP to add feature of delayed message delivery. > > > > > > ## Motivation > > > Scheduled and delayed message delivery is a very common feature to > > support > > > in a message system. Basically individual message can have a header > which > > > will be set by publisher and based on the header value the broker > should > > > hold on delivering messages until the configured delay or the scheduled > > > time is met. > > > > > > ## Usage > > > The delayed message delivery feature is enabled per message at producer > > > side. > > > > > > Delayed messages publish example in client side: > > > > > > ```java > > > // message to be delivered at the configured delay interval > > > producer.newMessage().delayAt(3L, TimeUnit.Minute).value("Hello > > > Pulsar!").send(); > > > > > > // message to be delivered at the configure time. > > > producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00, 00)) > > > ``` > > > > > > To enable or disable delay message feature: > > > > > > ```shell > > > pulsar-admin namespaces > > > > > > enable-delayed-message Enable delayed message for all topics of > the > > > namespace > > > Usage: enable-delayed-message [options] tenant/namespace > > > > > > Options: > > > -p --time-partition-granularity > > > Granularities of time will be partitioned, every time > > > partition will be > > > stored into legders and current time partition will be > > > load in memory > > > and organized in a TimeWheel.(eg: 30s, 5m, 1h, 3d, 2w) > > > Default: 5m > > > -t --tick-duration > > > The duration between tick in TimeWheel. Calculate ticks > > > per wheel > > > using time-partition-granularity / tick-duration before > > > load time > > > partition into a TimeWheel.(eg: 500ms, 1s, 5m) > > > Default: 1s > > > > > > disable-delayed-message Disable delayed message for all topics > > of > > > the namespace > > > Usage: disable-delayed-message tenant/namespace > > > ``` > > > > > > ## Design > > > > > > ### Delayed Message Index > > > > > > The “DelayedMessageIndex” will be implemented using a [TimeWheel > > approach]( > > > > http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf > > ). > > > We will be maintaining a delayed index, indexing the delayed message by > > its > > > time and actual message id. > > > > > > The index is partitioned by the delayed time. Each time partition will > be > > > stored using one (or few) ledger(s). For example, if we are configuring > > > the index to be partitioned by 5 minutes, we will store the index data > > for > > > every 5 minutes by its delayed time. The latest time partition will be > > > loaded in memory and organized in a TimeWheel. > > > > > > The TimeWheel is indexed by ticks. For example, if we configured the > tick > > > to be 1 second, we will be maintaining 300 ticks for 5 minutes’ index. > A > > > timer task is scheduled every tick, and it will pick the indexed > message > > > from the TimeWheel and dispatch them to the real consumers. > > > > > > After completing dispatching the messages in current TimeWheel, it will > > > load the TimeWheel from the next time partition. > > > > > > Delayed message option ` time-partition-granularity ` and > `tick-duration` > > > properly be reset to adapt delay message throughput change. ` > > > time-partition-granularity ` can't be shrink. For example, exist > config > > is > > > time-partition-granularity = 5m and tick-duration = 1s, delay message > > index > > > will store in 300 slot, If increase the time-partition-granularity to > > 10m, > > > when load next time partition TimeWheel will init with 600 slot, > > Timewheel > > > has enough slot to maintain already exist time partition(5m), but if > > > decrease the time-partition-granularity to 2m, Timewheel can't load > > already > > > exist time partition(5m) into 120 slot. So regardless the > > > time-partition-granularity shrink first, It's can be improve by split > > time > > > partition when load time partition. For time-partition-granularity, > > > increase or decrease just affect precision of delay time. > > > > > > Delay message feature conflict with TTL and Backlog > > > Quota(consumer_backlog_eviction). So it's necessary write a doc to > > explain > > > the conflict result. Next, if support ledger compact feature, it's can > be > > > improve by this feature, relocate un-ack messages to a new ledger, old > > > ledger can be safe delete. > > > > > > ### Delayed Message Index Cursor > > > > > > We will maintain a `cursor` for ensure the messages will be indexed > > > correctly in the `DelayMessageIndex`, so we don’t miss dispatching any > > > messages. > > > > > > ### Recovery > > > > > > Since we are organizing the delayed message index using ledgers, and > have > > > a cursor to ensure messages are correctly indexed. So when a broker > fails > > > due to any reason, and the topic is recovered, we can figure out the > > latest > > > time partition and load the index from the ledgers. > > > > > > ## Changes > > > > > > ### Protocol Changes > > > > > > In order to support `delayed` and `scheduled` messages, we need to add > > > following fields in `MessageMetadata`. > > > > > > ```protobuf > > > message MessageMetadata { > > > > > > // the message will be delayed at delivery by `delayed_ms` > > > milliseconds. > > > optional int64 delayed_ms = 18; > > > > > > } > > > ``` > > > > > > ### Broker Changes > > > > > > The publish path will not be changed. All the messages will still be > > first > > > appended to the managed ledger as normal. As the delivery related > message > > > attributes will be ignored for `failover` and `exclusive` subscriptions > > to > > > ensure FIFO ordering. > > > > > > The change happens at `MultipleConsumers` dispatcher. At the > dispatching > > > time, the dispatcher will check the delivery attributes at the message > > > header. If a message is qualified for delivery (exceeding the > configured > > > delayed/scheduled time), the message will be dispatched as normal; > > > otherwise it will be added back to the “delayed message index”. Those > > > delayed messages will be delayed at dispatching from the “delayed > message > > > index”. > > > > > > ### Client Changes > > > > > > #### TypedMessageBuilder > > > > > > ```java > > > public interface TypedMessageBuilder<T> { > > > > > > /** > > > * Build a message to be delivered at the configured delay > interval : > > > <tt>delayTime</tt>. > > > * @return typed message builder. > > > */ > > > TypedMessageBuilder<T> delayedAt(long delayTime, TimeUnit > > > timeUnit); > > > > > > /** > > > * Build a message to be delivered at the configure time. > > > * > > > * @return typed message builder. > > > */ > > > TypedMessageBuilder<T> scheduledAt(Date date); > > > } > > > ``` > > > > > > #### Producer > > > > > > Delay message can not send in bulk, because define the delay property > in > > > MessageMetadata. > > > > > > #### Consumer > > > > > > Both the `delayed` and `scheduled` messages will be appended to the > > topic. > > > The `delayed` and `scheduled` message attributes are only applied to > > > `shared` subscription. It means both exclusive and failover > subscriptions > > > will be ignoring these message delivery attributes to ensure FIFO > > delivery. > > > > > > ## Test Plan > > > > > > Unit tests + Integration Tests > > > > > > > > > > > > > > > https://docs.google.com/document/d/1yZYPJR6ZO9mJdeMgzbi4MJlp_KmKnU2cjbJmmDYyDI0/edit?usp=sharing > > > > > > > > > — > > > Regards, > > > Penghui Li > > >