Thanks a lot. +1 for Pulsar Function way On Sun, Nov 11, 2018 at 3:51 PM Sijie Guo <guosi...@gmail.com> wrote:
> Jia, > > I think what Joe suggested is to implement this outside of broker. The > currrent design doesn’t have to change, but the logic is implemented in a > Pulsar function - we can call it “Delayed Dispatch Function”. It basically > takes one topic as input topic, dispatch the messages to the output topic > based on the ‘delayed_at’ property in message header. We can still maintain > the timer wheels using bookkeeper ledgers (or using pulsar topics if that > makes things easier). All the core designs remain as same in the design doc > and the implementation is decoupled from brokers. > > The only downside for this approach, it will multiply the network bandwidth > usage. But it is probably okay since the traffic of delayed messages is > probably smaller than usual traffic. > > - Sijie > > On Sat, Nov 10, 2018 at 6:27 PM Jia Zhai <zhaiji...@gmail.com> wrote: > > > Thanks Joe for the comments. > > To implementation this feature in client side is great, but from my view, > > to achieve "Delayed Message", we may have to touch the dispatch path. > > Is there any idea or suggestion of how to do it from client side? > > > > > > On Sat, Nov 10, 2018 at 3:41 PM wenfeng wang <sxian.w...@gmail.com> > wrote: > > > > > Joe +1 > > > > > > Joe F <joefranc...@gmail.com> 于2018年11月10日周六 下午12:47写道: > > > > > > > I am not a fan of adding complexity to the dispatch path, and I will > > > always > > > > have serious concerns about proposals that do so, including this one. > > > > > > > > In general, I would prefer Pulsar to keep the dispatch path simple > and > > > > efficient, and avoid server side implementations of business logic. > > > > Streaming at scale, at low latency is what I think Pulsar should do. > > I > > > am > > > > biased here, because that is one of the reasons Pulsar got created > > > > originally, at a time when there were many other message brokers out > > > there > > > > ( and many under the Apache umbrella too) > > > > > > > > All those other message brokers do all kinds of server-side logic - > > > > filtering, transforming, scheduling, and so on. All of those systems > > have > > > > more or less ended up with bottlenecks and complexity. And this is > > not > > > > without reason. Message queues are queues, and most of the server > side > > > > logic implementations are attempts to make a queue into a database. A > > > > system that is optimized for flow as a queue, will not be good as a > > > > database, and vice-versa. > > > > > > > > I think the right way to do this kind of business logic is in the > > client > > > or > > > > leverage Pulsar functions, and the core broker dispatch path and > > process > > > > space should just deal with performance and flow at scale > > > > > > > > > Joe > > > > > > > > > > > > > > > > > > > > On Thu, Nov 8, 2018 at 1:39 PM 李鹏辉gmail <codelipeng...@gmail.com> > > wrote: > > > > > > > > > Dear all > > > > > > > > > > This is a PIP to add feature of delayed message delivery. > > > > > > > > > > ## Motivation > > > > > Scheduled and delayed message delivery is a very common feature to > > > > support > > > > > in a message system. Basically individual message can have a header > > > which > > > > > will be set by publisher and based on the header value the broker > > > should > > > > > hold on delivering messages until the configured delay or the > > scheduled > > > > > time is met. > > > > > > > > > > ## Usage > > > > > The delayed message delivery feature is enabled per message at > > producer > > > > > side. > > > > > > > > > > Delayed messages publish example in client side: > > > > > > > > > > ```java > > > > > // message to be delivered at the configured delay interval > > > > > producer.newMessage().delayAt(3L, TimeUnit.Minute).value("Hello > > > > > Pulsar!").send(); > > > > > > > > > > // message to be delivered at the configure time. > > > > > producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00, > 00)) > > > > > ``` > > > > > > > > > > To enable or disable delay message feature: > > > > > > > > > > ```shell > > > > > pulsar-admin namespaces > > > > > > > > > > enable-delayed-message Enable delayed message for all topics > of > > > the > > > > > namespace > > > > > Usage: enable-delayed-message [options] tenant/namespace > > > > > > > > > > Options: > > > > > -p --time-partition-granularity > > > > > Granularities of time will be partitioned, every > time > > > > > partition will be > > > > > stored into legders and current time partition will > > be > > > > > load in memory > > > > > and organized in a TimeWheel.(eg: 30s, 5m, 1h, 3d, > > 2w) > > > > > Default: 5m > > > > > -t --tick-duration > > > > > The duration between tick in TimeWheel. Calculate > > ticks > > > > > per wheel > > > > > using time-partition-granularity / tick-duration > > before > > > > > load time > > > > > partition into a TimeWheel.(eg: 500ms, 1s, 5m) > > > > > Default: 1s > > > > > > > > > > disable-delayed-message Disable delayed message for all > > topics > > > > of > > > > > the namespace > > > > > Usage: disable-delayed-message tenant/namespace > > > > > ``` > > > > > > > > > > ## Design > > > > > > > > > > ### Delayed Message Index > > > > > > > > > > The “DelayedMessageIndex” will be implemented using a [TimeWheel > > > > approach]( > > > > > > > > > http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf > > > > ). > > > > > We will be maintaining a delayed index, indexing the delayed > message > > by > > > > its > > > > > time and actual message id. > > > > > > > > > > The index is partitioned by the delayed time. Each time partition > > will > > > be > > > > > stored using one (or few) ledger(s). For example, if we are > > configuring > > > > > the index to be partitioned by 5 minutes, we will store the index > > data > > > > for > > > > > every 5 minutes by its delayed time. The latest time partition will > > be > > > > > loaded in memory and organized in a TimeWheel. > > > > > > > > > > The TimeWheel is indexed by ticks. For example, if we configured > the > > > tick > > > > > to be 1 second, we will be maintaining 300 ticks for 5 minutes’ > > index. > > > A > > > > > timer task is scheduled every tick, and it will pick the indexed > > > message > > > > > from the TimeWheel and dispatch them to the real consumers. > > > > > > > > > > After completing dispatching the messages in current TimeWheel, it > > will > > > > > load the TimeWheel from the next time partition. > > > > > > > > > > Delayed message option ` time-partition-granularity ` and > > > `tick-duration` > > > > > properly be reset to adapt delay message throughput change. ` > > > > > time-partition-granularity ` can't be shrink. For example, exist > > > config > > > > is > > > > > time-partition-granularity = 5m and tick-duration = 1s, delay > message > > > > index > > > > > will store in 300 slot, If increase the time-partition-granularity > to > > > > 10m, > > > > > when load next time partition TimeWheel will init with 600 slot, > > > > Timewheel > > > > > has enough slot to maintain already exist time partition(5m), but > if > > > > > decrease the time-partition-granularity to 2m, Timewheel can't load > > > > already > > > > > exist time partition(5m) into 120 slot. So regardless the > > > > > time-partition-granularity shrink first, It's can be improve by > split > > > > time > > > > > partition when load time partition. For time-partition-granularity, > > > > > increase or decrease just affect precision of delay time. > > > > > > > > > > Delay message feature conflict with TTL and Backlog > > > > > Quota(consumer_backlog_eviction). So it's necessary write a doc to > > > > explain > > > > > the conflict result. Next, if support ledger compact feature, it's > > can > > > be > > > > > improve by this feature, relocate un-ack messages to a new ledger, > > old > > > > > ledger can be safe delete. > > > > > > > > > > ### Delayed Message Index Cursor > > > > > > > > > > We will maintain a `cursor` for ensure the messages will be indexed > > > > > correctly in the `DelayMessageIndex`, so we don’t miss dispatching > > any > > > > > messages. > > > > > > > > > > ### Recovery > > > > > > > > > > Since we are organizing the delayed message index using ledgers, > and > > > have > > > > > a cursor to ensure messages are correctly indexed. So when a broker > > > fails > > > > > due to any reason, and the topic is recovered, we can figure out > the > > > > latest > > > > > time partition and load the index from the ledgers. > > > > > > > > > > ## Changes > > > > > > > > > > ### Protocol Changes > > > > > > > > > > In order to support `delayed` and `scheduled` messages, we need to > > add > > > > > following fields in `MessageMetadata`. > > > > > > > > > > ```protobuf > > > > > message MessageMetadata { > > > > > > > > > > // the message will be delayed at delivery by `delayed_ms` > > > > > milliseconds. > > > > > optional int64 delayed_ms = 18; > > > > > > > > > > } > > > > > ``` > > > > > > > > > > ### Broker Changes > > > > > > > > > > The publish path will not be changed. All the messages will still > be > > > > first > > > > > appended to the managed ledger as normal. As the delivery related > > > message > > > > > attributes will be ignored for `failover` and `exclusive` > > subscriptions > > > > to > > > > > ensure FIFO ordering. > > > > > > > > > > The change happens at `MultipleConsumers` dispatcher. At the > > > dispatching > > > > > time, the dispatcher will check the delivery attributes at the > > message > > > > > header. If a message is qualified for delivery (exceeding the > > > configured > > > > > delayed/scheduled time), the message will be dispatched as normal; > > > > > otherwise it will be added back to the “delayed message index”. > Those > > > > > delayed messages will be delayed at dispatching from the “delayed > > > message > > > > > index”. > > > > > > > > > > ### Client Changes > > > > > > > > > > #### TypedMessageBuilder > > > > > > > > > > ```java > > > > > public interface TypedMessageBuilder<T> { > > > > > > > > > > /** > > > > > * Build a message to be delivered at the configured delay > > > interval : > > > > > <tt>delayTime</tt>. > > > > > * @return typed message builder. > > > > > */ > > > > > TypedMessageBuilder<T> delayedAt(long delayTime, TimeUnit > > > > > timeUnit); > > > > > > > > > > /** > > > > > * Build a message to be delivered at the configure time. > > > > > * > > > > > * @return typed message builder. > > > > > */ > > > > > TypedMessageBuilder<T> scheduledAt(Date date); > > > > > } > > > > > ``` > > > > > > > > > > #### Producer > > > > > > > > > > Delay message can not send in bulk, because define the delay > property > > > in > > > > > MessageMetadata. > > > > > > > > > > #### Consumer > > > > > > > > > > Both the `delayed` and `scheduled` messages will be appended to the > > > > topic. > > > > > The `delayed` and `scheduled` message attributes are only applied > to > > > > > `shared` subscription. It means both exclusive and failover > > > subscriptions > > > > > will be ignoring these message delivery attributes to ensure FIFO > > > > delivery. > > > > > > > > > > ## Test Plan > > > > > > > > > > Unit tests + Integration Tests > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1yZYPJR6ZO9mJdeMgzbi4MJlp_KmKnU2cjbJmmDYyDI0/edit?usp=sharing > > > > > > > > > > > > > > > — > > > > > Regards, > > > > > Penghui Li > > > > > > > > > >