2020-05-28 15:01:23 UTC - Paul Wagner: @Paul Wagner has joined the channel
----
2020-05-29 00:02:07 UTC - Matteo Merli: >  Sounds like the best path would 
be to implement a heartbeat in the connections to bookkeeper and to kill 
sockets if heartbeats fail
Yes, I'd recommend that way in any case.

> When a bookie gets rescheduled, the time for a broker to be happy is just 
how fast keep alive.
This might also be made worse if there's no proper shutdown of bookie pod. Eg. 
K8S will send signal to the pod to allow for a graceful shutdown. If the JVM 
process is not receiving this, it will get forcefully closed (and the pod IP 
will stop responding).

Ensuring that JVM gets the signal and bookies is gracefully shut down, will 
make these sockets to get properly closed
----
2020-05-29 00:02:16 UTC - Matteo Merli: @Addison Higham ^^^
----
2020-05-29 02:51:22 UTC - Addison Higham: Yeah, I assumed that as well
----
2020-05-29 02:52:11 UTC - Addison Higham: About not getting proper shutdown 
that is
----
2020-05-29 06:41:45 UTC - Alex Yaroslavsky: I think I'm misunderstanding 
something but I can't seem to make key_shared to balance load between two 
consumers.
I use the code below, and only consumer1 gets all the messages (from different 
partitions)

```import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class test {
    private static Producer<byte[]> producer;
    private static Consumer<byte[]> consumer1;
    private static Consumer<byte[]> consumer2;
    private static String PULSAR_URL = "<pulsar+ssl://pulsar:6651>";
    private static String topic = "public/default/partitioned-topic";
    private static String subscription = "test";
    private static SubscriptionType subscriptionType = 
SubscriptionType.Key_Shared;
    
    public static PulsarClient getClient(String pulsarUrl) throws 
PulsarClientException {
        return PulsarClient.builder().
                serviceUrl(pulsarUrl)
                .authentication(new AuthenticationTls("certs/client.pem", 
"certs/client.key"))
                .build();        
    }

    public static void main(String[] args) throws PulsarClientException, 
InterruptedException {
        PulsarClient pc = getClient(PULSAR_URL);
        consumer1 = pc.newConsumer()
                .topic(topic)
                .subscriptionType(subscriptionType)
                .subscriptionName(subscription)
                .subscribe();

        PulsarClient pc2 = getClient(PULSAR_URL);
        consumer2 = pc2.newConsumer()
                .topic(topic)
                .subscriptionType(subscriptionType)
                .subscriptionName(subscription)
                .subscribe();

        PulsarClient pc3 = getClient(PULSAR_URL);
        producer = pc3.newProducer().topic(topic).create();

        produce();

        for (int i=0; i &lt; 2000; i++)
        {
            Message&lt;byte[]&gt; msg = consumer1.receive(1, 
TimeUnit.MILLISECONDS);
            if (msg != null) {
                consumer1.acknowledge(msg);
                System.out.println("Consumer1 Message " + i + ". key: " + 
msg.getKey() + "_" + msg.getTopicName() + " msg: " + new String(msg.getData(), 
StandardCharsets.UTF_8));
            }
            msg = consumer2.receive(1, TimeUnit.MILLISECONDS);
            if (msg != null) {
                consumer2.acknowledge(msg);
                System.out.println("Consumer2 Message " + i + ". key: " + 
msg.getKey() + "_" + msg.getTopicName() + " msg: " + new String(msg.getData(), 
StandardCharsets.UTF_8));
            }
        }

        try {
            producer.close();
            consumer1.close();
            consumer2.close();
            pc.close();
            pc2.close();
            pc3.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void produce() throws PulsarClientException, 
InterruptedException {
        for (String k: Arrays.asList("alex", "bibi", "camel", "ultra", 
"version", "water", "xavier", "yatsi", "zoology")) {
            for (int i=1; i &lt; 100; i++) {
                String message = k + "." + i;
                
producer.newMessage().key(k).value(message.getBytes()).sendAsync();
            }
        }
    }
}```

----
2020-05-29 07:10:48 UTC - Sijie Guo: Try to disable batching or use key-based 
batcher for your producer.
----
2020-05-29 07:42:59 UTC - Alex Yaroslavsky: @Sijie Guo Thanks a lot!
Adding batcherBuilder(BatcherBuilder.KEY_BASED) made it work.
100 : Sijie Guo
----
2020-05-29 07:56:34 UTC - Alex Yaroslavsky: Another question about key_shared, 
now with a function in the middle.
There is a publisher that publishes with keys (and key based batching enabled) 
to a certain partitioned topic.
A routing function (currently in Python, see below) reads from this topic and 
forwards the messages to a different partitioned topic (keeping the key with 
slight modification). If I now consume with KEY_SHARED from this topic then 
only one consumer will get all the messages.
Will rewriting the function in Java solve this issue, or functions do not 
support KEY_SHARED?

`from pulsar import Function`
`class RoutingFunction(Function):`
    `def process(self, item, context):`
        `properties = context.get_message_properties()`
        `context.publish("persistent://" + properties["tenant"] + "/ns/" + 
properties["dst_id"], item, message_conf={"partition_key": properties["tenant"] 
+ "_" + context.get_partition_key()})`

----
2020-05-29 08:16:44 UTC - Sijie Guo: functions doesn’t support key batcher yet. 
You can disable batching in functions. That would probably work.  Can you 
create an issue for us? /cc @Penghui Li
----
2020-05-29 08:25:24 UTC - Alex Yaroslavsky: @Sijie Guo Thanks for the quick 
reply! How do I disable batching in functions? And sure, I will create an issue 
for this.
----
2020-05-29 08:34:28 UTC - Alex Yaroslavsky: 
<https://github.com/apache/pulsar/issues/7095>
----
2020-05-29 09:05:10 UTC - Alex Yaroslavsky: I see that batching is hardcoded as 
True for both python and java functions. I will try to modify the python code 
on the worker...
----

Reply via email to