Thanks a lot.
+1 for Pulsar Function way

On Sun, Nov 11, 2018 at 3:51 PM Sijie Guo <guosi...@gmail.com> wrote:

> Jia,
>
> I think what Joe suggested is to implement this outside of broker. The
> currrent design doesn’t have to change, but the logic is implemented in a
> Pulsar function - we can call it “Delayed Dispatch Function”. It basically
> takes one topic as input topic, dispatch the messages to the output topic
> based on the ‘delayed_at’ property in message header. We can still maintain
> the timer wheels using bookkeeper ledgers (or using pulsar topics if that
> makes things easier). All the core designs remain as same in the design doc
> and the implementation is decoupled from brokers.
>
> The only downside for this approach, it will multiply the network bandwidth
> usage. But it is probably okay since the traffic of delayed messages is
> probably smaller than usual traffic.
>
> - Sijie
>
> On Sat, Nov 10, 2018 at 6:27 PM Jia Zhai <zhaiji...@gmail.com> wrote:
>
> > Thanks Joe for the comments.
> > To implementation this feature in client side is great, but from my view,
> > to achieve "Delayed Message", we may have to touch the dispatch path.
> > Is there any idea or suggestion of how to do it from client side?
> >
> >
> > On Sat, Nov 10, 2018 at 3:41 PM wenfeng wang <sxian.w...@gmail.com>
> wrote:
> >
> > > Joe +1
> > >
> > > Joe F <joefranc...@gmail.com> 于2018年11月10日周六 下午12:47写道:
> > >
> > > > I am not a fan of adding complexity to the dispatch path, and I will
> > > always
> > > > have serious concerns about proposals that do so, including this one.
> > > >
> > > >  In general, I would prefer Pulsar to keep the dispatch path simple
> and
> > > > efficient, and avoid server side implementations of business logic.
> > > > Streaming at scale, at low latency is what  I think Pulsar should do.
> > I
> > > am
> > > > biased here, because that is one of the reasons Pulsar got created
> > > > originally, at a time when there were many other message brokers out
> > > there
> > > > ( and many under the Apache umbrella too)
> > > >
> > > > All those other message brokers do all kinds of server-side logic -
> > > > filtering, transforming, scheduling, and so on. All of those systems
> > have
> > > > more or less ended up with bottlenecks and  complexity.  And  this is
> > not
> > > > without reason. Message queues are queues, and most of the server
> side
> > > > logic implementations are attempts to make a queue into a database. A
> > > > system that is optimized for flow as a queue, will not be good as a
> > > > database, and vice-versa.
> > > >
> > > > I think the right way to do this kind of business logic is in the
> > client
> > > or
> > > > leverage Pulsar functions, and the core broker dispatch path and
> > process
> > > > space should just deal with performance and flow at scale
> > > >
> >
> > > Joe
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Nov 8, 2018 at 1:39 PM 李鹏辉gmail <codelipeng...@gmail.com>
> > wrote:
> > > >
> > > > > Dear all
> > > > >
> > > > > This is a PIP to add feature of delayed message delivery.
> > > > >
> > > > > ## Motivation
> > > > > Scheduled and delayed message delivery is a very common feature to
> > > > support
> > > > > in a message system. Basically individual message can have a header
> > > which
> > > > > will be set by publisher and based on the header value the broker
> > > should
> > > > > hold on delivering messages until the configured delay or the
> > scheduled
> > > > > time is met.
> > > > >
> > > > > ## Usage
> > > > > The delayed message delivery feature is enabled per message at
> > producer
> > > > > side.
> > > > >
> > > > > Delayed messages publish example in client side:
> > > > >
> > > > > ```java
> > > > > // message to be delivered at the configured delay interval
> > > > > producer.newMessage().delayAt(3L, TimeUnit.Minute).value("Hello
> > > > > Pulsar!").send();
> > > > >
> > > > > // message to be delivered at the configure time.
> > > > > producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00,
> 00))
> > > > > ```
> > > > >
> > > > > To enable or disable delay message feature:
> > > > >
> > > > > ```shell
> > > > > pulsar-admin namespaces
> > > > >
> > > > > enable-delayed-message       Enable delayed message for all topics
> of
> > > the
> > > > > namespace
> > > > >   Usage: enable-delayed-message [options] tenant/namespace
> > > > >
> > > > >         Options:
> > > > >           -p --time-partition-granularity
> > > > >                 Granularities of time will be partitioned, every
> time
> > > > > partition will be
> > > > >                 stored into legders and current time partition will
> > be
> > > > > load in memory
> > > > >                 and organized in a TimeWheel.(eg: 30s, 5m, 1h, 3d,
> > 2w)
> > > > >                 Default: 5m
> > > > >           -t --tick-duration
> > > > >                 The duration between tick in TimeWheel. Calculate
> > ticks
> > > > > per wheel
> > > > >                 using time-partition-granularity / tick-duration
> > before
> > > > > load time
> > > > >                 partition into a TimeWheel.(eg: 500ms, 1s, 5m)
> > > > >             Default: 1s
> > > > >
> > > > > disable-delayed-message          Disable delayed message for all
> > topics
> > > > of
> > > > > the namespace
> > > > >   Usage: disable-delayed-message tenant/namespace
> > > > > ```
> > > > >
> > > > > ## Design
> > > > >
> > > > > ### Delayed Message Index
> > > > >
> > > > > The “DelayedMessageIndex” will be implemented using a [TimeWheel
> > > > approach](
> > > > >
> > >
> http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf
> > > > ).
> > > > > We will be maintaining a delayed index, indexing the delayed
> message
> > by
> > > > its
> > > > > time and actual message id.
> > > > >
> > > > > The index is partitioned by the delayed time. Each time partition
> > will
> > > be
> > > > > stored using one (or few) ledger(s). For example, if we are
> > configuring
> > > > > the index to be partitioned by 5 minutes, we will store the index
> > data
> > > > for
> > > > > every 5 minutes by its delayed time. The latest time partition will
> > be
> > > > > loaded in memory and organized in a TimeWheel.
> > > > >
> > > > > The TimeWheel is indexed by ticks. For example, if we configured
> the
> > > tick
> > > > > to be 1 second, we will be maintaining 300 ticks for 5 minutes’
> > index.
> > > A
> > > > > timer task is scheduled every tick, and it will pick the indexed
> > > message
> > > > > from the TimeWheel and dispatch them to the real consumers.
> > > > >
> > > > > After completing dispatching the messages in current TimeWheel, it
> > will
> > > > > load the TimeWheel from the next time partition.
> > > > >
> > > > > Delayed message option ` time-partition-granularity ` and
> > > `tick-duration`
> > > > > properly be reset to adapt delay message throughput change.   `
> > > > > time-partition-granularity `  can't be shrink. For example, exist
> > > config
> > > > is
> > > > > time-partition-granularity = 5m and tick-duration = 1s, delay
> message
> > > > index
> > > > > will store in 300 slot, If increase the time-partition-granularity
> to
> > > > 10m,
> > > > > when load next time partition TimeWheel will init with 600 slot,
> > > > Timewheel
> > > > > has enough slot to maintain already exist time partition(5m), but
> if
> > > > > decrease the time-partition-granularity to 2m, Timewheel can't load
> > > > already
> > > > > exist time partition(5m) into 120 slot. So regardless the
> > > > > time-partition-granularity shrink first, It's can be improve by
> split
> > > > time
> > > > > partition when load time partition. For time-partition-granularity,
> > > > > increase or decrease just affect precision of delay time.
> > > > >
> > > > > Delay message feature conflict with TTL and Backlog
> > > > > Quota(consumer_backlog_eviction). So it's necessary write a doc to
> > > > explain
> > > > > the conflict result. Next, if support ledger compact feature, it's
> > can
> > > be
> > > > > improve by this feature, relocate un-ack messages to a new ledger,
> > old
> > > > > ledger can be safe delete.
> > > > >
> > > > > ### Delayed Message Index Cursor
> > > > >
> > > > > We will maintain a `cursor` for ensure the messages will be indexed
> > > > > correctly in the `DelayMessageIndex`, so we don’t miss dispatching
> > any
> > > > > messages.
> > > > >
> > > > > ### Recovery
> > > > >
> > > > > Since we are organizing the delayed message index using ledgers,
> and
> > > have
> > > > > a cursor to ensure messages are correctly indexed. So when a broker
> > > fails
> > > > > due to any reason, and the topic is recovered, we can figure out
> the
> > > > latest
> > > > > time partition and load the index from the ledgers.
> > > > >
> > > > > ## Changes
> > > > >
> > > > > ### Protocol Changes
> > > > >
> > > > > In order to support `delayed` and `scheduled` messages, we need to
> > add
> > > > > following fields in `MessageMetadata`.
> > > > >
> > > > > ```protobuf
> > > > > message MessageMetadata {
> > > > >
> > > > >     // the message will be delayed at delivery by `delayed_ms`
> > > > > milliseconds.
> > > > >     optional int64 delayed_ms   = 18;
> > > > >
> > > > > }
> > > > > ```
> > > > >
> > > > > ### Broker Changes
> > > > >
> > > > > The publish path will not be changed. All the messages will still
> be
> > > > first
> > > > > appended to the managed ledger as normal. As the delivery related
> > > message
> > > > > attributes will be ignored for `failover` and `exclusive`
> > subscriptions
> > > > to
> > > > > ensure FIFO ordering.
> > > > >
> > > > > The change happens at `MultipleConsumers` dispatcher. At the
> > > dispatching
> > > > > time, the dispatcher will check the delivery attributes at the
> > message
> > > > > header. If a message is qualified for delivery (exceeding the
> > > configured
> > > > > delayed/scheduled time), the message will be dispatched as normal;
> > > > > otherwise it will be added back to the “delayed message index”.
> Those
> > > > > delayed messages will be delayed at dispatching from the “delayed
> > > message
> > > > > index”.
> > > > >
> > > > > ### Client Changes
> > > > >
> > > > > #### TypedMessageBuilder
> > > > >
> > > > > ```java
> > > > > public interface TypedMessageBuilder<T> {
> > > > >
> > > > >     /**
> > > > >      * Build a message to be delivered at the configured delay
> > > interval :
> > > > > <tt>delayTime</tt>.
> > > > >      * @return typed message builder.
> > > > >      */
> > > > >           TypedMessageBuilder<T> delayedAt(long delayTime, TimeUnit
> > > > > timeUnit);
> > > > >
> > > > >     /**
> > > > >      * Build a message to be delivered at the configure time.
> > > > >      *
> > > > >      * @return typed message builder.
> > > > >      */
> > > > >     TypedMessageBuilder<T> scheduledAt(Date date);
> > > > > }
> > > > > ```
> > > > >
> > > > > #### Producer
> > > > >
> > > > > Delay message can not send in bulk, because define the delay
> property
> > > in
> > > > > MessageMetadata.
> > > > >
> > > > > #### Consumer
> > > > >
> > > > > Both the `delayed` and `scheduled` messages will be appended to the
> > > > topic.
> > > > > The `delayed` and `scheduled` message attributes are only applied
> to
> > > > > `shared` subscription. It means both exclusive and failover
> > > subscriptions
> > > > > will be ignoring these message delivery attributes to ensure FIFO
> > > > delivery.
> > > > >
> > > > > ## Test Plan
> > > > >
> > > > > Unit tests + Integration Tests
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1yZYPJR6ZO9mJdeMgzbi4MJlp_KmKnU2cjbJmmDYyDI0/edit?usp=sharing
> > > > >
> > > > >
> > > > > —
> > > > > Regards,
> > > > > Penghui Li
> > > >
> > >
> >
>

Reply via email to