Hello,
currently in 2.8.0-SNAPSHOT we added an API to access PulsarAdmin from
a Pulsar Sink Connector.

The Api is Context.getPulsarAdmin and Context.getPulsarAdmin(clusterName)

https://github.com/apache/pulsar/blob/0469dfe2c7804bd9ca9ea34e95d83b2196216cf9/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java#L272

With this API we introduced a hard link between the Client Admin API
and the Functions API.

If I understand correctly this is undesirable as this implies that the
Functions API now depends on the Pulsar Admin API.

I believe that having the PulsarAdmin handle is very useful and
necessary in some circumstances.
Currently the only way is to add configuration parameters to the
Connector and bootstrap the Pulsar Admin from the Connector.
This comes with two big problems:
- security: you must store in the configuration the credentials to
access the same cluster (or other clusters)
- maintainability: you have to handle all of the relevant options of
the client, and it is hard to be future proof

The same happens when you need a Pulsar Client.

I have a proposal to resolve this problem, before we cut 2.8.0 and we
make it a stable API.

What about introducing a little bit of "dependency" injection in the
Pulsar Functions framework ?

Legacy code:

class MyClass {
  public X myCode(Y) {
     PulsarAdmin admin = context.getPulsarAdmin();
     admin.doSomething();
   }
}

Dependency injection style code:

class MyClass {
    @Resource
    PulsarAdmin admin;

   @Resource
    PulsarClient pulsarClient;

  public X myCode(Y) {
     admin.doSomething();
   }

}

this way we do not have a hard reference from "Context" to PulsarAdmin
and to PulsarClient, the dependency can be resolved at runtime.

In order to implement getPulsarAdmin(clusterName) we can introduce a
new API PulsarAdminLocator in the Pulsar Admin API package.

interface PulsarAdminLocator {
      PulsarAdmin getPulsarAdmin(String cluster);
}

class MyClass {
    @Resource
    PulsarAdminLocator adminLocator;

  public X myCode(Y) {
     PulsarAdmin admin = adminLocator.getPulsarAdmin(cluster);
     admin.doSomething();
   }

}

Does it make sense ?

I will be happy to follow up with a PIP

This work is particularly important in order to fix many issues we
have on Connectors, when you need to access Pulsar itself.

for reference:
https://github.com/apache/pulsar/pull/8668 [pulsar-io] Support
authentication on debezium connector
https://github.com/apache/pulsar/pull/10484 Support Pulsar Auth token
for Debezium and Kafka connect adaptor



Enrico

Reply via email to