2020-03-06 15:57:14 UTC - Tamer: I would also second `pulsarctl` I use it in 
Jenkins CI/CD to create and update pulsar functions and it works pretty well. 
If you use Jenkins, I can share with you my Jenkinsfile.

I tried the REST Apis with `curl` but it was tricky to get the request 
formatted properly. It worked with a rest api tool `Insomnia` but when I copy 
the exact `curl` command I was getting 4xx error so I gave up on it and used 
`pulsarctl` which seems cleaner and faster compared to pulsar-admin for CI/CD 
(single binary, no JVM initialization)

With `pulsarctl` I use S3 to deploy the function jar, get a signed S3 HTTP url 
then check if the function exists to update otherwise I create a new function.
----
2020-03-06 17:54:06 UTC - Luke Lu: Hey guys, while setting backlog quotas, we 
hit “Backlog Quota exceeds configured retention quota for namespace. Please 
increase retention quota and retry”, when setting back log quota the same as 
retention. This seems counter intuitive, because the default backlog quota is 
10G, which is way bigger than the retention policy. Now that we need to set a 
backlog quota, how come it needs to be smaller than retention limit? This 
especially doesn’t make sense if the backlog policy is 
consumer_backlog_eviction, which will silently drop older messages. IMO, 
backlog quota should always be _greater or equal_ than retention limit. 
<https://github.com/apache/pulsar/blob/a3e1efc229f4839c4e8edec690f8ca52ddb5894a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1708>
----
2020-03-06 17:57:58 UTC - Luke Lu: the above condition returns false in 
`checkQuotas` . IMO, the logic should return false only when `quota.getLimit() 
&lt; …`
----
2020-03-06 18:16:09 UTC - Sijie Guo: @Luke Lu: the retention is used for 
limiting the storage for a topic and the backlog is the messages are not 
“processed” by consumers yet. so the idea iss that you can’t have a backlog 
larger than your storage limit which is defined by retention.
----
2020-03-06 18:20:39 UTC - Luke Lu: Understood the difference, but we’re 
conflating storage limit vs typical goals of retention policy in enterprises, 
which is to make sure we have enough data retained (e.g. for compliance 
reasons).  Storage limit should really be a separate configuration.
----
2020-03-06 18:22:17 UTC - Luke Lu: It especially doesn’t make sense, when the 
behavior is the default  backlog quota is 10G, which works fine with retention 
policy of 5M. But suddenly stops working when we want to set the backlog quota 
to 1G.
----
2020-03-06 18:29:45 UTC - Sijie Guo: retention policy indicates the storage 
usage. I don’t think they are completely separated. the confusion here is more 
about the inconsistency you mentioned below.

&gt; It especially doesn’t make sense, when the behavior is the default  
backlog quota is 10G, which works fine with retention policy of 5M.
this seems to be an inconsistent behavior regarding “default” retention vs 
retention set in namespace. we need to make them consistent. Can you create a 
github issue?
----
2020-03-06 18:31:26 UTC - Luke Lu: backlog quota is essentially a buffer size 
that’s more related to storage limit for operators, which shouldn’t be limited 
by retention which is usually requested by applications (e.g. to replay 
messages in retention).
----
2020-03-06 18:39:42 UTC - Addison Higham: okay, so back to a topic from a few 
days ago, in pulsar functions, when you use the 
`context.newOutputMessage().sendAsync`, I don't believe you still get 
at-least-once guarantees?
looking at the code, I don't see how it could, once you return control back to 
the function runtime and don't have a return value, it immediately acks the 
message. I don't see any evidence of anything fancy going on that would make it 
such that a send has to happen?
It appears the `ContextImpl` does some logging of errors for any `sendAsync` 
calls, but doesn't do anything with those exceptions other than log them.

It seems like for those use cases where messages are multi-plexed or if 
multiple messages are produced AND you want good performance, the blocking 
nature of the functions API and no ability to control acking causes a problem
----
2020-03-06 18:39:48 UTC - Addison Higham: am I missing something there?
----
2020-03-06 18:45:08 UTC - Addison Higham: I explored this by doing a small WIP 
of what I think could work that I will publish to a branch, but essentially, 
what I think may make sense is to introduce a new functions interface that 
allows the user to return a `Record` object instead of the raw object. What 
that should allow for is:
• the ability to just return a record with an overridden destination topic and 
still have all the machinery of functions do the sending/acking to guarantee 
at-least-once/effectively once etc 
• the ability to override the ack implementation on the source `Record`, this 
gives some extension points for making sure some other things complete before 
the record is acked
----
2020-03-06 18:50:37 UTC - Addison Higham: 
<https://github.com/apache/pulsar/pull/6502> &lt;- WIP on the idea, happy to 
make a PIP for it, but the code was helpful for me to make sure I understood 
the problem :slightly_smiling_face:
----
2020-03-06 19:35:23 UTC - Sijie Guo: backlog is a more subscription side 
concept, while retention is more a topic side concept. both of them are 
essentially related to storage usage.

how would you expect Pulsar behaves if introducing another storage limit? How 
these settings are interacting with each other?
----
2020-03-06 19:41:27 UTC - Sijie Guo: @Addison Higham - 
`newOutputMessage().sendAsync` is not in the processing guarantee scope. the 
current processing guarantee relies on returning value.

instead of returning a `OutputRecord` object, a simpler alternative is 
introducing an AsyncFunction which returns a `CompletableFuture&lt;T&gt;` as 
the result. the runtime replies on the complete future to know when the 
function is done.
----
2020-03-06 19:41:38 UTC - Luke Lu: I’d propose that we simply make them more 
independent of each other as they address somewhat separate concerns, by simply 
removing the backlog vs retention check. This allows admins to set policies for 
different requirements. e.g.:
1. larger backlog quota to make sure retention policies can be satisfied to 
retain enough data
2. smaller backlog quota to limit storage usage.
----
2020-03-06 19:42:48 UTC - Addison Higham: okay, just wanted to make sure I had 
that right before going forward, the docs don't really make that clear
----
2020-03-06 19:48:33 UTC - Addison Higham: and yes, I considered using a 
completableFuture, from an implementation perspective it may actually be a bit 
more complex just because the machinery around it is all blocking. The reason I 
went this way is that the `PulsarSink` already handles writing to multiple 
producers and ensuring a send before acking and all that so long as you can 
control the `Record` it gets handed, we get a lot pretty easily, including 
simple keyed messages (whereas you can't do keyed output right now with a 
returned value). Another option I considered was not changing the interface at 
all and just do an `instanceof` check, on the results, but that felt ugly.

I am totally cool with an `AsyncFunction` , but it just seemed like it would be 
a bigger change and still not be as flexible as the `Record` though I can 
understand not wanting to expose the `Record` objects as part of the function 
API
----
2020-03-06 20:05:59 UTC - Sijie Guo: My feeling is that returning 
`OutputRecord` is opening up a hole to the system. Because people has to know 
what exactly the system running in order to return a right `OutputRecord`. The 
idea of functions is to make user easier to program.
----
2020-03-06 20:15:49 UTC - Addison Higham: two thoughts:
1. I agree and think it is important to still keep a simple API for most use 
cases, but I would love an extension point or two for more advanced use cases. 
Maybe this isn't the right way to do it, but I do think the functions API could 
benefit from a few more points to hook into.  In addition to being able to 
control output a bit more (where an `AsyncFunction` could work), I think 
instead of a single functional interface, it would be great to have a layer 
like flink, where you can get lifecycle methods (open, close, etc) 
2. For this path though, what if the output record was a more concrete class? 
Instead of an interface, could have a default implementation that the 
controller sets the output record on and all that. Or instead of `process` 
returning an output record, we could have the existing API as is plus a `wrap` 
method with a default implementation that is extended
----
2020-03-06 20:36:29 UTC - Sijie Guo: yes. agreed with a layered api is useful.

• we should use `context` to build the output record and avoid user 
implementing it is own output record to introduce uncertainty.
• take one step further, we should considering return an OutputRecordSet to 
accumulate multiple outputs. since 1) people may produce multiple results per 
function; 2) in future we can easily integrate transaction into it.
----
2020-03-06 20:51:10 UTC - Addison Higham: like both those ideas 
:slightly_smiling_face: I am out most of next week, will try and get this down 
in a PIP for more discussion and keep going with the implementation. I may also 
look a bit more at async function
----
2020-03-06 20:56:17 UTC - Sijie Guo: AsyncFunction might has a different use 
case for those tasks require very long processing time. E.g. calling another 
rpc service.
----
2020-03-06 23:02:02 UTC - Andres Riofrio: @Andres Riofrio has joined the channel
----
2020-03-07 03:19:21 UTC - Joe Francis: Thinking about this -- backlog is what 
is not consumed. Retention is what is retained. Lets say you have retention R 
and you start replaying everything. At that point your backlog B = R.   and 
backlog policies kick-in if the set backlog quota is less than R
----
2020-03-07 07:55:11 UTC - Luke Lu: Think about a use case, where you only need 
to keep the latest N messages for any topics in a namespace without blocking 
producers, regardless of consumers. So you want to use backlog quota N with the 
consumer_backlog_eviction policy. Note, this is completely independent of 
retention, which only happens after a topic message is consumed (acked).
----

Reply via email to