I think Joe left a clue in the idea of using Pulsar Functions.

Sent from my iPhone

> On Nov 10, 2018, at 6:26 PM, Jia Zhai <zhaiji...@gmail.com> wrote:
> 
> Thanks Joe for the comments.
> To implementation this feature in client side is great, but from my view,
> to achieve "Delayed Message", we may have to touch the dispatch path.
> Is there any idea or suggestion of how to do it from client side?
> 
> 
>> On Sat, Nov 10, 2018 at 3:41 PM wenfeng wang <sxian.w...@gmail.com> wrote:
>> 
>> Joe +1
>> 
>> Joe F <joefranc...@gmail.com> 于2018年11月10日周六 下午12:47写道:
>> 
>>> I am not a fan of adding complexity to the dispatch path, and I will
>> always
>>> have serious concerns about proposals that do so, including this one.
>>> 
>>> In general, I would prefer Pulsar to keep the dispatch path simple and
>>> efficient, and avoid server side implementations of business logic.
>>> Streaming at scale, at low latency is what  I think Pulsar should do.  I
>> am
>>> biased here, because that is one of the reasons Pulsar got created
>>> originally, at a time when there were many other message brokers out
>> there
>>> ( and many under the Apache umbrella too)
>>> 
>>> All those other message brokers do all kinds of server-side logic -
>>> filtering, transforming, scheduling, and so on. All of those systems have
>>> more or less ended up with bottlenecks and  complexity.  And  this is not
>>> without reason. Message queues are queues, and most of the server side
>>> logic implementations are attempts to make a queue into a database. A
>>> system that is optimized for flow as a queue, will not be good as a
>>> database, and vice-versa.
>>> 
>>> I think the right way to do this kind of business logic is in the client
>> or
>>> leverage Pulsar functions, and the core broker dispatch path and process
>>> space should just deal with performance and flow at scale
>>> 
> 
>> Joe
>>> 
>>> 
>>> 
>>> 
>>>> On Thu, Nov 8, 2018 at 1:39 PM 李鹏辉gmail <codelipeng...@gmail.com> wrote:
>>>> 
>>>> Dear all
>>>> 
>>>> This is a PIP to add feature of delayed message delivery.
>>>> 
>>>> ## Motivation
>>>> Scheduled and delayed message delivery is a very common feature to
>>> support
>>>> in a message system. Basically individual message can have a header
>> which
>>>> will be set by publisher and based on the header value the broker
>> should
>>>> hold on delivering messages until the configured delay or the scheduled
>>>> time is met.
>>>> 
>>>> ## Usage
>>>> The delayed message delivery feature is enabled per message at producer
>>>> side.
>>>> 
>>>> Delayed messages publish example in client side:
>>>> 
>>>> ```java
>>>> // message to be delivered at the configured delay interval
>>>> producer.newMessage().delayAt(3L, TimeUnit.Minute).value("Hello
>>>> Pulsar!").send();
>>>> 
>>>> // message to be delivered at the configure time.
>>>> producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00, 00))
>>>> ```
>>>> 
>>>> To enable or disable delay message feature:
>>>> 
>>>> ```shell
>>>> pulsar-admin namespaces
>>>> 
>>>> enable-delayed-message       Enable delayed message for all topics of
>> the
>>>> namespace
>>>>  Usage: enable-delayed-message [options] tenant/namespace
>>>> 
>>>>        Options:
>>>>          -p --time-partition-granularity
>>>>                Granularities of time will be partitioned, every time
>>>> partition will be
>>>>                stored into legders and current time partition will be
>>>> load in memory
>>>>                and organized in a TimeWheel.(eg: 30s, 5m, 1h, 3d, 2w)
>>>>                Default: 5m
>>>>          -t --tick-duration
>>>>                The duration between tick in TimeWheel. Calculate ticks
>>>> per wheel
>>>>                using time-partition-granularity / tick-duration before
>>>> load time
>>>>                partition into a TimeWheel.(eg: 500ms, 1s, 5m)
>>>>            Default: 1s
>>>> 
>>>> disable-delayed-message          Disable delayed message for all topics
>>> of
>>>> the namespace
>>>>  Usage: disable-delayed-message tenant/namespace
>>>> ```
>>>> 
>>>> ## Design
>>>> 
>>>> ### Delayed Message Index
>>>> 
>>>> The “DelayedMessageIndex” will be implemented using a [TimeWheel
>>> approach](
>>>> 
>> http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf
>>> ).
>>>> We will be maintaining a delayed index, indexing the delayed message by
>>> its
>>>> time and actual message id.
>>>> 
>>>> The index is partitioned by the delayed time. Each time partition will
>> be
>>>> stored using one (or few) ledger(s). For example, if we are configuring
>>>> the index to be partitioned by 5 minutes, we will store the index data
>>> for
>>>> every 5 minutes by its delayed time. The latest time partition will be
>>>> loaded in memory and organized in a TimeWheel.
>>>> 
>>>> The TimeWheel is indexed by ticks. For example, if we configured the
>> tick
>>>> to be 1 second, we will be maintaining 300 ticks for 5 minutes’ index.
>> A
>>>> timer task is scheduled every tick, and it will pick the indexed
>> message
>>>> from the TimeWheel and dispatch them to the real consumers.
>>>> 
>>>> After completing dispatching the messages in current TimeWheel, it will
>>>> load the TimeWheel from the next time partition.
>>>> 
>>>> Delayed message option ` time-partition-granularity ` and
>> `tick-duration`
>>>> properly be reset to adapt delay message throughput change.   `
>>>> time-partition-granularity `  can't be shrink. For example, exist
>> config
>>> is
>>>> time-partition-granularity = 5m and tick-duration = 1s, delay message
>>> index
>>>> will store in 300 slot, If increase the time-partition-granularity to
>>> 10m,
>>>> when load next time partition TimeWheel will init with 600 slot,
>>> Timewheel
>>>> has enough slot to maintain already exist time partition(5m), but if
>>>> decrease the time-partition-granularity to 2m, Timewheel can't load
>>> already
>>>> exist time partition(5m) into 120 slot. So regardless the
>>>> time-partition-granularity shrink first, It's can be improve by split
>>> time
>>>> partition when load time partition. For time-partition-granularity,
>>>> increase or decrease just affect precision of delay time.
>>>> 
>>>> Delay message feature conflict with TTL and Backlog
>>>> Quota(consumer_backlog_eviction). So it's necessary write a doc to
>>> explain
>>>> the conflict result. Next, if support ledger compact feature, it's can
>> be
>>>> improve by this feature, relocate un-ack messages to a new ledger, old
>>>> ledger can be safe delete.
>>>> 
>>>> ### Delayed Message Index Cursor
>>>> 
>>>> We will maintain a `cursor` for ensure the messages will be indexed
>>>> correctly in the `DelayMessageIndex`, so we don’t miss dispatching any
>>>> messages.
>>>> 
>>>> ### Recovery
>>>> 
>>>> Since we are organizing the delayed message index using ledgers, and
>> have
>>>> a cursor to ensure messages are correctly indexed. So when a broker
>> fails
>>>> due to any reason, and the topic is recovered, we can figure out the
>>> latest
>>>> time partition and load the index from the ledgers.
>>>> 
>>>> ## Changes
>>>> 
>>>> ### Protocol Changes
>>>> 
>>>> In order to support `delayed` and `scheduled` messages, we need to add
>>>> following fields in `MessageMetadata`.
>>>> 
>>>> ```protobuf
>>>> message MessageMetadata {
>>>> 
>>>>    // the message will be delayed at delivery by `delayed_ms`
>>>> milliseconds.
>>>>    optional int64 delayed_ms   = 18;
>>>> 
>>>> }
>>>> ```
>>>> 
>>>> ### Broker Changes
>>>> 
>>>> The publish path will not be changed. All the messages will still be
>>> first
>>>> appended to the managed ledger as normal. As the delivery related
>> message
>>>> attributes will be ignored for `failover` and `exclusive` subscriptions
>>> to
>>>> ensure FIFO ordering.
>>>> 
>>>> The change happens at `MultipleConsumers` dispatcher. At the
>> dispatching
>>>> time, the dispatcher will check the delivery attributes at the message
>>>> header. If a message is qualified for delivery (exceeding the
>> configured
>>>> delayed/scheduled time), the message will be dispatched as normal;
>>>> otherwise it will be added back to the “delayed message index”. Those
>>>> delayed messages will be delayed at dispatching from the “delayed
>> message
>>>> index”.
>>>> 
>>>> ### Client Changes
>>>> 
>>>> #### TypedMessageBuilder
>>>> 
>>>> ```java
>>>> public interface TypedMessageBuilder<T> {
>>>> 
>>>>    /**
>>>>     * Build a message to be delivered at the configured delay
>> interval :
>>>> <tt>delayTime</tt>.
>>>>     * @return typed message builder.
>>>>     */
>>>>          TypedMessageBuilder<T> delayedAt(long delayTime, TimeUnit
>>>> timeUnit);
>>>> 
>>>>    /**
>>>>     * Build a message to be delivered at the configure time.
>>>>     *
>>>>     * @return typed message builder.
>>>>     */
>>>>    TypedMessageBuilder<T> scheduledAt(Date date);
>>>> }
>>>> ```
>>>> 
>>>> #### Producer
>>>> 
>>>> Delay message can not send in bulk, because define the delay property
>> in
>>>> MessageMetadata.
>>>> 
>>>> #### Consumer
>>>> 
>>>> Both the `delayed` and `scheduled` messages will be appended to the
>>> topic.
>>>> The `delayed` and `scheduled` message attributes are only applied to
>>>> `shared` subscription. It means both exclusive and failover
>> subscriptions
>>>> will be ignoring these message delivery attributes to ensure FIFO
>>> delivery.
>>>> 
>>>> ## Test Plan
>>>> 
>>>> Unit tests + Integration Tests
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1yZYPJR6ZO9mJdeMgzbi4MJlp_KmKnU2cjbJmmDYyDI0/edit?usp=sharing
>>>> 
>>>> 
>>>> —
>>>> Regards,
>>>> Penghui Li
>>> 
>> 

Reply via email to