On Thu, Jan 24, 2019 at 8:13 AM Joe F <joefranc...@gmail.com> wrote: > To me this discussion presupposes that a streaming system should provide > a service like a database. Before we discuss about how to implement this, > we should look at whether this is something that fits into what is the core > of Pulsar. I still have the same concerns against doing this in the broker > dispatch side. > > What exactly is the delayed delivery use case? Random insertion, dynamic > sorting, and deletion from the top of the sort. That is a priority queue. > It is best implemented as a heap. For larger sets it's some sort of tree > structure. You can simulate that on a database with an index. > > Random insertion and deletion is not what FIFO queues like Pulsar are > designed for. The closest thing I can think of with Pulsar is to build an > in-mem priority queue in a Pulsar function, feed it from an input topic and > publish the top of the queue into a separate output topic.
> In fact the > entire logic proposed in PIP-26 can be done outside the broker in a Pulsar > function. > Correct. PIP-26 can be implemented in Functions. I believe the last discussion in PIP-26 thread kind of agree on functions approach. If the community is okay with PIP-26 in functions, I think that is probably a good approach to start. > > For a small scale setup, these distinctions do not matter - you can use a > database as a queue and a queue as a database. But at any larger scale, a > streaming system is not the correct solution for a priority queue use case, > whether it's Pulsar or some other streaming system. So far I have not seen > any mention of the target scale for the design, or the specific use case > requirements > > -joe > > > On Sat, Jan 19, 2019 at 6:43 PM PengHui Li <codelipeng...@gmail.com> > wrote: > > > Hi All, > > > > Actually, I also prefer to simplify at broker side. > > > > If pulsar support set arbitrary timeout on each message, if not cluster > > failure or consumer failure, > > it needs to behave normally(on time). Otherwise, user need to understand > > how pulsar dispatching > > messages and how limit of unacked messages change the delay message > > behavior. This may > > lead users to hesitate, this feature may be misused when the user does > not > > fully understand how it works. > > > > When user depends arbitrary timeout message feature, users just need to > > keep producer and consumer > > is work well, and administrator of pulsar need to keep pulsar cluster > work > > well. > > > > I don't think pulsar is very necessary to support this feature(arbitrary > > timeout message), > > In most scenarios, #3155 can work well, In a few cases, even if support > > arbitrary timeout message in > > client side, i believe that still can not meet the requirement of all > > delayed messages. > > > > To me, i’m not against support arbitrary timeout on each message on > client > > side, maybe this is useful > > for other users. In some of our scenarios, we also need a more functional > > alternative(a task service). > > > > Of course, If we can integrate a task service, we can use pulsar to > > guaranteed delivery of messages, > > task service guaranteed send message to pulsar success. Or pulsar broker > > support filter server. > > This way users can implement their own task services. > > > > Ezequiel Lovelle <ezequiellove...@gmail.com> 于2019年1月20日周日 上午12:28写道: > > > > > > If the goal is to minimize the amount of redeliveries from broker -> > > > client, there are multiple ways to achieve that with the client based > > > approach > > > (eg. send message id and delay time instead of the full payload to > > > consumers > > > as Ivan proposed). > > > > > > But the main reason to put this logic on client side was not adding > delay > > > related logic on broker side, in order to do this optimisations the > > broker > > > must be aware of delayed message and only send message id and delay > time > > > without payload. > > > > > > > I don't necessarily agree with that. NTP is widely available > > > and understood. Any application that's doing anything time-related > would > > > have > > > to make sure the clocks are reasonably synced. > > > > > > Yep, that's true, but from my point of view a system that depends on > > client > > > side clock is weaker than a system that does this kind of calculation > at > > > a more controlled environment aka backend. This adds one more factor > that > > > depends on the user doing things right, which is not always the case. > > > > > > One possible solution might be the broker send periodically its current > > > epoch time and the client do the calculations with this data, or send > > epoch > > > time initially at subscription and do the rest of calculations doing > > delta > > > of > > > time using the initial time from broker as a base (time flows equally > for > > > both > > > the important thing is which one is positioned at the very present > time). > > > > > > Anyway this mentioned approach sound like an a hack just from the fact > of > > > not doing the time calculations in the backend. > > > > > > > Lastly, i do agree client side approaches have better scalability > than > > > server side approaches in most cases. However I don’t believe that it > is > > > the case here. And I don’t see anyone have a clear explanation on why a > > > broker approach is less scalable than the client side approach. > > > > > > Yes, I agree with this. At least for fixed time delay at pr #3155. > > > > > > The only remained concern to me would be Gc usage of stored positions > > > next to be expired, anyway, since the nature of a fixed delay and > > > from the fact that process a ledger tend to be in a sequentially > manner, > > > we could store a range of positions id for some delta when intensive > > > traffic is going on, I believe I did this mention on the pr. > > > > > > > Again, in general I'm more concerned of stuff that happens in broker > > > because > > > it will have to be scaled up 10s of thousands of times in a single > > > process, while in client typically the requirements are much simpler. > > > > > > I agree that adding logic to broker should be considered with deep > care, > > > but in this specific scenario at worst case we will only have one and > > only > > > one scheduled task per consumer which will take all expired positions > > > from a DelayQueue. > > > > > > -- > > > *Ezequiel Lovelle* > > > > > > > > > On Sat, 19 Jan 2019 at 01:02, Matteo Merli <matteo.me...@gmail.com> > > wrote: > > > > > > > Just a quick correction: > > > > > > > > > And I don’t see anyone have a clear explanation on why a > > > > broker approach is less scalable than the client side approach. > > > > > > > > I haven't said it less or more scalable. I was meaning that it's > > > > "easier" to scale, in that we don't have to do lots of fancy stuff > > > > and add more and more control to make sure that the implementation > > > > will not become a concern point at scale (eg: limit the overall > > > > amount of memory used in broker, across all topics, and the > > > > impact on GC of these long-living objects). > > > > > > > > > However, clock skew in a brokerside approach > > > > is easier to manage and more predictable, but clock skew in a > > clientside > > > > approach is much harder to manage and more unpredictable > > > > > > > > I don't necessarily agree with that. NTP is widely available > > > > and understood. > > > > Any application that's doing anything time-related would have > > > > to make sure the clocks are reasonably synced. > > > > > > > > -- > > > > Matteo Merli > > > > <matteo.me...@gmail.com> > > > > > > > > On Fri, Jan 18, 2019 at 7:46 PM Sijie Guo <guosi...@gmail.com> > wrote: > > > > > > > > > > On Sat, Jan 19, 2019 at 9:45 AM Matteo Merli < > matteo.me...@gmail.com > > > > > > > wrote: > > > > > > > > > > > Trying to group and compress responses here. > > > > > > > > > > > > > If consumer control the delayed message specific execution time > > we > > > > must > > > > > > trust clock of consumer, this can cause delayed message process > > ahead > > > > of > > > > > > time, some applications cannot tolerate this condition. > > > > > > > > > > > > This is a problem that cannot be solved. > > > > > > Even assuming the timestamps are assigned by brokers and are > > > guaranteed > > > > > > to be monotonic, this won't prevent 2 brokers from having clock > > > skews. > > > > > > That would results in different delivery delays. > > > > > > > > > > > > Similarly, the broker timestamp might be assigned later compared > to > > > > when a > > > > > > publisher was "intending" to start the clock. > > > > > > > > > > > > Barring super-precise clock synchronization techniques (which are > > way > > > > out > > > > > > of the scope of this discussion), the only reasonable way to > think > > > > about > > > > > > this is > > > > > > that delays needs to be orders of magnitudes bigger than the > > average > > > > clock > > > > > > skew experienced with common techniques (eg: NTP). NTP clock skew > > > will > > > > > > generally be in the 10s of millis. Any delay > 1 seconds will > > hardly > > > be > > > > > > noticeably affected by these skews. > > > > > > > > > > > > Additionally, any optimization on the timeouts handling (like the > > > > > > hash-wheel > > > > > > timer proposed in PIP-26) will trade off precision for > efficiency. > > In > > > > that > > > > > > case, > > > > > > the delays are managed in buckets, and can result in higher > delays > > > that > > > > > > what was requested. > > > > > > > > > > > > > 1. Fixed timeout, e.g.(with 10s, 30s, 10min delayed), this is > the > > > > largest > > > > > > proportion in throughput of delayed message . A subscription > with a > > > > fixed > > > > > > delayed time can approach to this scene. > > > > > > > > > > > > I don't think that for fixed delays, any server-side > implementation > > > > > > would provide > > > > > > any advantage compared to doing: > > > > > > > > > > > > ``` > > > > > > while (true) { > > > > > > Message msg = consumer.receive(); > > > > > > long delayMillis = calculateDelay(msg) > > > > > > if (delayMillis > 0) { > > > > > > Thread.sleep(delayMillis); > > > > > > } > > > > > > > > > > > > // Do something > > > > > > consumer.acknowledge(msg); > > > > > > } > > > > > > ``` > > > > > > > > > > > > This will not need any support from broker. Also, there will be > no > > > > > > redeliveries. > > > > > > > > > > > > It could be wrapped in the client API, although I don't see that > as > > > > > > big of a problem. > > > > > > > > > > > > > My concern of this category of approaches is "bandwidth" usage. > > It > > > is > > > > > > basically trading bandwidth for complexity. > > > > > > > > > > > > With mixed delays on a single topic, in any case there has to be > > some > > > > kind > > > > > > of time-based sorting of the messages that needs to happen either > > at > > > > broker > > > > > > or at client. > > > > > > > > > > > > Functionally, I believe that either place is equivalent (from a > > user > > > > > > point of view), > > > > > > barring the different implementation requirements. > > > > > > > > > > > > In my view, the bigger cost here is not bandwidth but rather the > > disk > > > > > > IO, that will > > > > > > happen exactly in the same way in both cases. Messages can be > > cached, > > > > > > up to a certain point, either in broker or in client library. > After > > > > > > that, in both cases, > > > > > > the messages will have to be fetched from bookies. > > > > > > > > > > > > Also, when implementing the delay feature in the client, the > > existing > > > > > > flow control > > > > > > mechanism is naturally applied to limit the overall amount of > > > > information > > > > > > that > > > > > > we have to keep track (the "currently tracked" messages). Some > > other > > > > > > mechanism > > > > > > would have to be done in the broker as well. > > > > > > > > > > > > Again, in general I'm more concerned of stuff that happens in > > broker > > > > > > because > > > > > > it will have to be scaled up 10s of thousands of times in a > single > > > > > > process, while > > > > > > in client typically the requirements are much simpler. > > > > > > > > > > > > If the goal is to minimize the amount of redeliveries from broker > > -> > > > > > > client, there > > > > > > are multiple ways to achieve that with the client based approach > > (eg. > > > > send > > > > > > message id and delay time instead of the full payload to > consumers > > as > > > > Ivan > > > > > > proposed). > > > > > > > > > > > > This seems to be simpler and with less overhead than having to > > > persist > > > > > > the whole > > > > > > hashweel timer state into a ledger. > > > > > > > > > > > > > > > I agree with that there are many optimizations can be applied at a > > > client > > > > > side approach. In a stable world, these approaches are technically > > > > > equivalent. > > > > > > > > > > However I do not agree with a few points: > > > > > > > > > > First, based on my past production experiences, network bandwidth > on > > > > broker > > > > > is the bigger cost than io cost in a multi subscription case. > Also, I > > > > have > > > > > heard a few production users have experienced latency issues where > > > broker > > > > > network bandwidth is saturated. So any mechanisms that rely on > > > > redeliveries > > > > > are a big red flag to me. > > > > > > > > > > Secondly, currently pulsar is using more bandwidth on brokers, than > > > > > bandwidth on bookies. It is not a balanced state. I am more leaning > > > > towards > > > > > an approach that can leverage bookies’ idle bandwidth, rather than > > > > > potentially using more bandwidth on brokers. > > > > > > > > > > Thirdly, in my view, clock skew concern is not a technical issue, > > but a > > > > > management issue. As what Ivan and you have pointed out, there are > > many > > > > > ways on addressing clock skew. However, clock skew in a brokerside > > > > approach > > > > > is easier to manage and more predictable, but clock skew in a > > > clientside > > > > > approach is much harder to manage and more unpredictable. This > > > > > unpredictability can significantly change the io or network pattern > > > when > > > > > things go bad. When such unpredictability happens, it can cause bad > > > > things > > > > > and saturating broker network in a redeliver-ish approach. If we > are > > > > > building a distributed system that can handle this > unpredictability, > > a > > > > > broker-side approach is much more friendly to managebility and > > incident > > > > > management. > > > > > > > > > > Lastly, i do agree client side approaches have better scalability > > than > > > > > server side approaches in most cases. However I don’t believe that > it > > > is > > > > > the case here. And I don’t see anyone have a clear explanation on > > why a > > > > > broker approach is less scalable than the client side approach. > > > > > > > > > > Anyway, for managebility, bandwidth usage, client simplicity, I am > > more > > > > in > > > > > favor of a broker side approach, or at least an approach that is > not > > > > > redelivery based. However since the feature is requested by Penghui > > > > > and Ezequiel, > > > > > I am also fine with this client side approach if they are okay with > > > that. > > > > > > > > > > - Sijie > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Matteo Merli > > > > > > <matteo.me...@gmail.com> > > > > > > > > > > > > > > > > > > On Fri, Jan 18, 2019 at 6:35 AM Ezequiel Lovelle > > > > > > <ezequiellove...@gmail.com> wrote: > > > > > > > > > > > > > > Hi All! and sorry for delay :) > > > > > > > > > > > > > > Probably I'm going to say some things already said, so sorry > > > > beforehand. > > > > > > > > > > > > > > The two main needed features I think are the proposed: > > > > > > > A. Producer delay PIP-26. B. Consumers delay PR #3155 > > > > > > > > > > > > > > Of course PIP-26 would result in consumers receiving delayed > > > messages > > > > > > > but the important thing here is one of them made the decision > > about > > > > > > delay. > > > > > > > > > > > > > > First, the easy one, PR #3155. Consumers delay: > > > > > > > > > > > > > > As others have stated before, this is a more trivial approach > > > because > > > > > > > of the nature of having the exactly same period of delay for > each > > > > message > > > > > > > which is predictable. > > > > > > > > > > > > > > I agree that adding logic at broker should be avoided, but, for > > > this > > > > > > > specific feature #3155 which I don't think is complex I believe > > > there > > > > > > > are others serious advantages: > > > > > > > > > > > > > > 1. Simplicity at client side, we don't need to add any code > > which > > > is > > > > > > > less error prone. > > > > > > > 2. Clock issues from client side being outdated and causing > > > headache > > > > > > > to users detecting this. > > > > > > > 3. Avoids huge overhead delivering non expired messages across > > the > > > > > > > network unnecessary. > > > > > > > 4. Consumers are free to decide to consume messages with delay > > > > > > regardless > > > > > > > of the producer. > > > > > > > 5. Delay is uniform for all messages, which sometimes is the > > > > solution > > > > > > > to the problem rather than arbitrary delays. > > > > > > > > > > > > > > I think that would be great if pulsar can provide this kind of > > > > features > > > > > > > without relaying on users needing to know heavy details about > the > > > > > > > mechanism. > > > > > > > > > > > > > > For PIP-26: > > > > > > > > > > > > > > I think we can offer this with the purpose of message's with a > > more > > > > long > > > > > > > delay in terms of time? hours / days? > > > > > > > > > > > > > > So, if this is the case, we can assume a small granularity of > > time > > > > like > > > > > > > 1 minute making ledger's representing 1 minute of time and > > > truncating > > > > > > > each time of message for it corresponding minute and storing in > > > that > > > > > > > special ledger. > > > > > > > Users wanting to receive a messages scheduled for some days in > > > future > > > > > > > rarely would care of a margin of error of 1 minute. > > > > > > > > > > > > > > Of course we need somehow make the broker aware of this in > order > > to > > > > only > > > > > > > process ledger's for current corresponding minute and consume > it. > > > > > > > And the broker would be the one subject to close current minute > > > > truncated > > > > > > > processed ledger. > > > > > > > > > > > > > > One problem I can think about this approach, is it painful for > > > > Bookkeeper > > > > > > > to having a lot of opened ledgers? (one for each minute per > > topic) > > > > > > > > > > > > > > Another problem here might be what happen if consumer was not > > > > started? > > > > > > > At startup time the broker should looking for potentially older > > > > ledger's > > > > > > > than its current time and this might be expensive. > > > > > > > > > > > > > > Other more trivial issue, we might need to refactor current > > > mechanism > > > > > > > which deletes closed ledgers older than the configured time on > > name > > > > > > space. > > > > > > > > > > > > > > As a final note I think that would be great to have both > features > > > in > > > > > > pulsar > > > > > > > but sometimes not everything desired is achievable. > > > > > > > And please correct me if I said something senseless. > > > > > > > > > > > > > > -- > > > > > > > *Ezequiel Lovelle* > > > > > > > > > > > > > > > > > > > > > On Fri, 18 Jan 2019 at 05:51, PengHui Li < > > codelipeng...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > So rather than specifying the absolute timestamp that the > > > message > > > > > > > > > should appear to the user, the dispatcher can specify the > > > > relative > > > > > > > > > delay after dispatch that it should appear to the user. > > > > > > > > > > > > > > > > As matteo said the worst case would be that the applied delay > > to > > > be > > > > > > higher > > > > > > > > for some of the messages, if specify the relative delay to > > > > consumer, > > > > > > > > if consumer offline for a period of time, consumer will > receive > > > > many > > > > > > > > delayed messages > > > > > > > > after connect to broker again will cause the worst case more > > > > serious. > > > > > > It's > > > > > > > > difficult to keep > > > > > > > > consumers always online. > > > > > > > > > > > > > > > > In my personal perspective, i refer to use `delay level > topic` > > to > > > > > > approach > > > > > > > > smaller delays scene. > > > > > > > > e.g(10s-topic, 30s-topic), this will not be too much topic. > And > > > we > > > > are > > > > > > > > using dead letter topic to simulate > > > > > > > > delay message feature, delayed topics has different delay > > level. > > > > > > > > > > > > > > > > For very long delays scene, in our practice, user may cancel > it > > > or > > > > > > restart > > > > > > > > it. > > > > > > > > After previous discussions, i agree that PIP-26 will make > > broker > > > > > > > > more complexity. > > > > > > > > So I had the idea to consider as a separate mechanism. > > > > > > > > > > > > > > > > > > > > > > > > Sijie Guo <guosi...@gmail.com> 于2019年1月18日周五 下午3:22写道: > > > > > > > > > > > > > > > > > On Fri, Jan 18, 2019 at 2:51 PM Ivan Kelly < > iv...@apache.org > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > One thing missing from this discussion is details on the > > > > motivating > > > > > > > > > > use-case. How many delayed messages per second are we > > > > expecting? > > > > > > And > > > > > > > > > > what is the payload size? > > > > > > > > > > > > > > > > > > > > > If consumer control the delayed message specific > > execution > > > > time > > > > > > we > > > > > > > > must > > > > > > > > > > > trust clock of consumer, this can cause delayed message > > > > process > > > > > > ahead > > > > > > > > > of > > > > > > > > > > > time, some applications cannot tolerate this condition. > > > > > > > > > > > > > > > > > > > > This can be handled in a number of ways. Consumer clocks > > can > > > be > > > > > > skewed > > > > > > > > > > with regard to other clocks, but it is generally safe to > > > assume > > > > > > that > > > > > > > > > > clocks advance at the same rate, especially at the > > > granularity > > > > of a > > > > > > > > > > couple of hours. > > > > > > > > > > So rather than specifying the absolute timestamp that the > > > > message > > > > > > > > > > should appear to the user, the dispatcher can specify the > > > > relative > > > > > > > > > > delay after dispatch that it should appear to the user. > > > > > > > > > > > > > > > > > > > > > > My concern of this category of approaches is > > "bandwidth" > > > > > > usage. It > > > > > > > > is > > > > > > > > > > > > basically trading bandwidth for complexity. > > > > > > > > > > > > > > > > > > > > > > @Sijie Guo <si...@apache.org> Agree with you, such an > > > > trading > > > > > > can > > > > > > > > > cause > > > > > > > > > > the > > > > > > > > > > > broker's out going network to be more serious. > > > > > > > > > > > > > > > > > > > > I don't think PIP-26's approach may not use less > bandwidth > > in > > > > this > > > > > > > > > > regard. With PIP-26, the msg ids are stored in a ledger, > > and > > > > when > > > > > > the > > > > > > > > > > timeout triggers it dispatches? Are all the delayed > message > > > > being > > > > > > > > > > cached at the broker? If so, that is using a lot of > memory, > > > and > > > > > > it's > > > > > > > > > > exactly the kind of memory usage pattern that is very bad > > for > > > > JVM > > > > > > > > > > garbage collection. If not, then you have to read the > > message > > > > back > > > > > > in > > > > > > > > > > from bookkeeper, so the bandwidth usage is the same, > though > > > on > > > > a > > > > > > > > > > different path. > > > > > > > > > > > > > > > > > > > > In the client side approach, the message could be cached > to > > > > avoid a > > > > > > > > > > redispatch. When I was discussing with Matteo, we > discussed > > > > this. > > > > > > The > > > > > > > > > > redelivery logic has to be there in any case, as any > cache > > > > (broker > > > > > > or > > > > > > > > > > client side) must have a limited size. > > > > > > > > > > Another option would be to skip sending the payload for > > > delayed > > > > > > > > > > messages, and only send it when the client request > > > redelivery, > > > > but > > > > > > > > > > this has the same issue with regard to the entry likely > > > > falling out > > > > > > > > > > the cache at the broker-side. > > > > > > > > > > > > > > > > > > > > > > > > > > > There are bandwidth usage at either approaches for sure. > The > > > main > > > > > > > > > difference between broker-side and client-side approaches > is > > > > which > > > > > > part > > > > > > > > of > > > > > > > > > the bandwidth is used. > > > > > > > > > > > > > > > > > > In the broker-side approach, it is using the bookies egress > > and > > > > > > broker > > > > > > > > > ingress bandwidth. In a typical pulsar deployment, bookies > > > > egress is > > > > > > > > mostly > > > > > > > > > idle unless there are consumers falling behind. > > > > > > > > > > > > > > > > > > In the client-side approach, it is using broker’s egress > > > > bandwidth > > > > > > and > > > > > > > > > potentially bookies’ egress bandwidth. Brokers’ egress is > > > > critical > > > > > > since > > > > > > > > it > > > > > > > > > is shared across consumers. So if the broker egress is > > doubled, > > > > it > > > > > > is a > > > > > > > > red > > > > > > > > > flag. > > > > > > > > > > > > > > > > > > Although I agree the bandwidth usage depends on workloads. > > But > > > in > > > > > > theory, > > > > > > > > > broker-side approach is more friendly to resource usage > and a > > > > better > > > > > > > > > approach to use the resources in a multi layered > > architecture. > > > > > > Because it > > > > > > > > > uses less bandwidth at broker side. A client side can cause > > > more > > > > > > > > bandwidth > > > > > > > > > usage at broker side. > > > > > > > > > > > > > > > > > > Also as what penghui pointed out, clock screw can be > another > > > > factor > > > > > > > > causing > > > > > > > > > more traffic in a fanout case. In a broker-side approach, > the > > > > > > deferred is > > > > > > > > > done in a central point, so when the deferred time point > > kicks > > > > in, > > > > > > broker > > > > > > > > > just need to read the data one time from bookies. However > in > > a > > > > > > > > client-side > > > > > > > > > approach, the messages are asked by different > subscriptions, > > > > > > different > > > > > > > > > subscription can ask the deferred message at any time based > > on > > > > their > > > > > > > > > clocks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Ivan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >