Joe +1 Joe F <joefranc...@gmail.com> 于2018年11月10日周六 下午12:47写道:
> I am not a fan of adding complexity to the dispatch path, and I will always > have serious concerns about proposals that do so, including this one. > > In general, I would prefer Pulsar to keep the dispatch path simple and > efficient, and avoid server side implementations of business logic. > Streaming at scale, at low latency is what I think Pulsar should do. I am > biased here, because that is one of the reasons Pulsar got created > originally, at a time when there were many other message brokers out there > ( and many under the Apache umbrella too) > > All those other message brokers do all kinds of server-side logic - > filtering, transforming, scheduling, and so on. All of those systems have > more or less ended up with bottlenecks and complexity. And this is not > without reason. Message queues are queues, and most of the server side > logic implementations are attempts to make a queue into a database. A > system that is optimized for flow as a queue, will not be good as a > database, and vice-versa. > > I think the right way to do this kind of business logic is in the client or > leverage Pulsar functions, and the core broker dispatch path and process > space should just deal with performance and flow at scale > > Joe > > > > > On Thu, Nov 8, 2018 at 1:39 PM 李鹏辉gmail <codelipeng...@gmail.com> wrote: > > > Dear all > > > > This is a PIP to add feature of delayed message delivery. > > > > ## Motivation > > Scheduled and delayed message delivery is a very common feature to > support > > in a message system. Basically individual message can have a header which > > will be set by publisher and based on the header value the broker should > > hold on delivering messages until the configured delay or the scheduled > > time is met. > > > > ## Usage > > The delayed message delivery feature is enabled per message at producer > > side. > > > > Delayed messages publish example in client side: > > > > ```java > > // message to be delivered at the configured delay interval > > producer.newMessage().delayAt(3L, TimeUnit.Minute).value("Hello > > Pulsar!").send(); > > > > // message to be delivered at the configure time. > > producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00, 00)) > > ``` > > > > To enable or disable delay message feature: > > > > ```shell > > pulsar-admin namespaces > > > > enable-delayed-message Enable delayed message for all topics of the > > namespace > > Usage: enable-delayed-message [options] tenant/namespace > > > > Options: > > -p --time-partition-granularity > > Granularities of time will be partitioned, every time > > partition will be > > stored into legders and current time partition will be > > load in memory > > and organized in a TimeWheel.(eg: 30s, 5m, 1h, 3d, 2w) > > Default: 5m > > -t --tick-duration > > The duration between tick in TimeWheel. Calculate ticks > > per wheel > > using time-partition-granularity / tick-duration before > > load time > > partition into a TimeWheel.(eg: 500ms, 1s, 5m) > > Default: 1s > > > > disable-delayed-message Disable delayed message for all topics > of > > the namespace > > Usage: disable-delayed-message tenant/namespace > > ``` > > > > ## Design > > > > ### Delayed Message Index > > > > The “DelayedMessageIndex” will be implemented using a [TimeWheel > approach]( > > http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf > ). > > We will be maintaining a delayed index, indexing the delayed message by > its > > time and actual message id. > > > > The index is partitioned by the delayed time. Each time partition will be > > stored using one (or few) ledger(s). For example, if we are configuring > > the index to be partitioned by 5 minutes, we will store the index data > for > > every 5 minutes by its delayed time. The latest time partition will be > > loaded in memory and organized in a TimeWheel. > > > > The TimeWheel is indexed by ticks. For example, if we configured the tick > > to be 1 second, we will be maintaining 300 ticks for 5 minutes’ index. A > > timer task is scheduled every tick, and it will pick the indexed message > > from the TimeWheel and dispatch them to the real consumers. > > > > After completing dispatching the messages in current TimeWheel, it will > > load the TimeWheel from the next time partition. > > > > Delayed message option ` time-partition-granularity ` and `tick-duration` > > properly be reset to adapt delay message throughput change. ` > > time-partition-granularity ` can't be shrink. For example, exist config > is > > time-partition-granularity = 5m and tick-duration = 1s, delay message > index > > will store in 300 slot, If increase the time-partition-granularity to > 10m, > > when load next time partition TimeWheel will init with 600 slot, > Timewheel > > has enough slot to maintain already exist time partition(5m), but if > > decrease the time-partition-granularity to 2m, Timewheel can't load > already > > exist time partition(5m) into 120 slot. So regardless the > > time-partition-granularity shrink first, It's can be improve by split > time > > partition when load time partition. For time-partition-granularity, > > increase or decrease just affect precision of delay time. > > > > Delay message feature conflict with TTL and Backlog > > Quota(consumer_backlog_eviction). So it's necessary write a doc to > explain > > the conflict result. Next, if support ledger compact feature, it's can be > > improve by this feature, relocate un-ack messages to a new ledger, old > > ledger can be safe delete. > > > > ### Delayed Message Index Cursor > > > > We will maintain a `cursor` for ensure the messages will be indexed > > correctly in the `DelayMessageIndex`, so we don’t miss dispatching any > > messages. > > > > ### Recovery > > > > Since we are organizing the delayed message index using ledgers, and have > > a cursor to ensure messages are correctly indexed. So when a broker fails > > due to any reason, and the topic is recovered, we can figure out the > latest > > time partition and load the index from the ledgers. > > > > ## Changes > > > > ### Protocol Changes > > > > In order to support `delayed` and `scheduled` messages, we need to add > > following fields in `MessageMetadata`. > > > > ```protobuf > > message MessageMetadata { > > > > // the message will be delayed at delivery by `delayed_ms` > > milliseconds. > > optional int64 delayed_ms = 18; > > > > } > > ``` > > > > ### Broker Changes > > > > The publish path will not be changed. All the messages will still be > first > > appended to the managed ledger as normal. As the delivery related message > > attributes will be ignored for `failover` and `exclusive` subscriptions > to > > ensure FIFO ordering. > > > > The change happens at `MultipleConsumers` dispatcher. At the dispatching > > time, the dispatcher will check the delivery attributes at the message > > header. If a message is qualified for delivery (exceeding the configured > > delayed/scheduled time), the message will be dispatched as normal; > > otherwise it will be added back to the “delayed message index”. Those > > delayed messages will be delayed at dispatching from the “delayed message > > index”. > > > > ### Client Changes > > > > #### TypedMessageBuilder > > > > ```java > > public interface TypedMessageBuilder<T> { > > > > /** > > * Build a message to be delivered at the configured delay interval : > > <tt>delayTime</tt>. > > * @return typed message builder. > > */ > > TypedMessageBuilder<T> delayedAt(long delayTime, TimeUnit > > timeUnit); > > > > /** > > * Build a message to be delivered at the configure time. > > * > > * @return typed message builder. > > */ > > TypedMessageBuilder<T> scheduledAt(Date date); > > } > > ``` > > > > #### Producer > > > > Delay message can not send in bulk, because define the delay property in > > MessageMetadata. > > > > #### Consumer > > > > Both the `delayed` and `scheduled` messages will be appended to the > topic. > > The `delayed` and `scheduled` message attributes are only applied to > > `shared` subscription. It means both exclusive and failover subscriptions > > will be ignoring these message delivery attributes to ensure FIFO > delivery. > > > > ## Test Plan > > > > Unit tests + Integration Tests > > > > > > > > > https://docs.google.com/document/d/1yZYPJR6ZO9mJdeMgzbi4MJlp_KmKnU2cjbJmmDYyDI0/edit?usp=sharing > > > > > > — > > Regards, > > Penghui Li >