Hi Robert,

Thanks for the KIP updates.

The interfaces look suitable for brokers, with some small changes. If we
can adapt the interface to implement the existing DynamicBrokerConfig, then
we are good.

With broker configs:

   1. We don't know what configs are in ZK since we allow custom configs.
   So we would use `ConfigProvider.get()` without specifying keys.
   2. We want to see all changes (i.e. changes under a path). We can deal
   with this internally by ignoring `keys` and subscribing to everything
   3. We have two paths (one for per-broker config and another for default
   config shared by all brokers). All methods should ideally provide path -
   see changes suggested below.
   4. Keys are not independent. We update in batches (e.g keystore +
   password). We want to see batches of changes, not individual changes. We
   retrieve all values from a path when a change is detected. We can do this
   by ignoring values from the callback, but it would be better if the
   callback interface could be changed - see below.


public interface ConfigProvider extends Configurable, Closeable {

    *//** KIP-226 will use this*
    ConfigData get(ConfigContext ctx, String path);

    *// **KIP-226 will never use this, we don't know what keys are in ZK
since we allow custom configs*
    ConfigData get(ConfigContext ctx, String path, Set<String> keys);

*    // KIP-226 will ignore `key` and subscribe to all changes.*
*    // But based on the above method, this should perhaps be:*
*    //  subscribe(String path, Set<String> keys,
ConfigurationChangeCallback callback)?*
    void subscribe(String key, ConfigurationChangeCallback callback);

     *<== As above, un**subscribe(String path, Set<String> keys)**?*
    void unsubscribe(String key);
}

public interface ConfigurationChangeCallback {
    *// **For brokers, we want to process all updated keys in a single
callback. P**erhaps this could be: *

*    //   onChange(String path, Map<String, String> values)?*

    void onChange(String key, String value);
}

A few other questions (I read your response to Colin, but still didn't get
it. Could be because I am not familiar with the interfaces required for
vaults, sorry):

   1. Are we expecting one provider instance with different contexts
   provided to `ConfigProvider.get()`? If we created a different provider
   instance for each context, we could deal with scheduling reloads in the
   provider implementation?
   2. Couldn't ConfigData  be an interface that just returns a map of
   key-value pairs. Providers that return metadata could extend it to provide
   metadata in a meaningful format instead of Map<String, String>.
   3. For ZK, we would use ConfigProvider.get() without `keys` to get all
   keys in the path. Do we have two get() methods since some providers need
   keys to be specified and some don't? How do we decide which one to use?

Thanks,

Rajini


On Wed, May 16, 2018 at 2:40 AM, Robert Yokota <rayok...@gmail.com> wrote:

> Thanks, Ron!  I will take a look.
>
> Regards,
> Robert
>
> On Tue, May 15, 2018 at 5:59 PM, Ron Dagostino <rndg...@gmail.com> wrote:
>
> > Hi Robert.  Regarding your comment "use the lease duration to schedule a
> > configuration reload in the future", you might be interested in the code
> > that does refresh for OAuth Bearer Tokens in KIP-255; specifically, the
> > class
> > org.apache.kafka.common.security.oauthbearer.internal.expiring.
> > ExpiringCredentialRefreshingLogin.
> > The class performs JAAS logins/relogins based on the expiration time of a
> > retrieved expiring credential.  The implementation of that class is
> > inspired by the code that currently does refresh for Kerberos tickets but
> > is more reusable.  I don't know if you will leverage JAAS for defining
> how
> > to go get a credential (you could since you have to provide credentials
> to
> > authenticate to the remote systems anyway), but regardless, that class
> > should be useful at least in some minimal sense if not more than that.
> See
> > https://github.com/apache/kafka/pull/4994.
> >
> > Ron
> >
> > Ron
> >
> > On Tue, May 15, 2018 at 8:01 PM, Robert Yokota <rayok...@gmail.com>
> wrote:
> >
> > > Hi Colin,
> > >
> > > Thanks for the feedback!
> > >
> > >
> > > > The KIP says that "Vault is very popular and has been described as
> 'the
> > > current gold standard in secret management and provisioning'."  I think
> > > this might be a bit too much detail -- we don't really need to
> > > > favorites, right? :)
> > >
> > > I've removed this line :)
> > >
> > >
> > > > I think we should make the substitution part of the generic
> > configuration
> > > code, rather than specific to individual ConfigProviders.  We don't
> > really
> > > want it to work differently for Vault vs. KeyWhiz vs.
> > > > AWS secrets, etc. etc.
> > >
> > > Yes, the ConfigProviders merely serve up key-value pairs.  A helper
> class
> > > like ConfigTransformer would perform the variable substitutions if
> > desired.
> > >
> > >
> > > > We should also spell out exactly how substitution works.
> > >
> > > By one-level of indirection I just meant a simple replacement of
> > variables
> > > (which are the indirect references).  So if you have foo=${bar} and
> > > bar=${baz} and your file contains bar=hello, baz=world, then the final
> > > result would be foo=hello and bar=world.  I've added this example to
> the
> > > KIP.
> > >
> > > You can see this as the DEFAULT_PATTERN in the ConfigTransformer.  The
> > > ConfigTransformer only provides one level of indirection.
> > >
> > >
> > > > We should also spell out how this interacts with KIP-226
> > configurations.
> > >
> > > Yes, I mention at the beginning that KIP-226 could use the
> ConfigProvider
> > > but not the ConfigTransformer.
> > >
> > >
> > > > Maybe a good generic interface would be like this:
> > >
> > > I've added the subscription APIs but would like to keep the other APIs
> > as I
> > > will need them for integration with Vault.  With Vault I obtain the
> lease
> > > duration at the time the key is obtained, so at that time I would want
> to
> > > use the lease duration to schedule a configuration reload in the
> future.
> > > This is similar to how the integration between Vault and the Spring
> > > Framework works.   Also, the lease duration would be included in the
> > > metadata map vs. the data map.  Finally, I need an additional "path" or
> > > "bucket" parameter, which is used by Vault to indicate which set of
> > > key-values are to be retrieved.
> > >
> > >
> > > > With regard to ConfigTransformer: do we need to include all this code
> > in
> > > the KIP?  Seems like an implementation detail.
> > >
> > > I use the ConfigTransformer to show how the pattern ${provider:key} is
> > > defined and how the substitution only involves one level of
> indirection.
> > > If you feel it does not add anything to the text, I can remove it.
> > >
> > >
> > > > Is there a way to avoid this couping?
> > >
> > > I'd have to look into it and get back to you.  However, I assume that
> the
> > > answer is not relevant for this KIP :)
> > >
> > >
> > > Thanks,
> > > Robert
> > >
> > >
> > >
> > >
> > >
> > > On Tue, May 15, 2018 at 4:04 PM, Colin McCabe <cmcc...@apache.org>
> > wrote:
> > >
> > > > Hi Robert,
> > > >
> > > > Thanks for posting this.  In the past we've been kind of reluctant to
> > add
> > > > more complexity to configuration.  I think Connect does have a clear
> > need
> > > > for this kind of functionality, though.  As you mention, Connect
> > > integrates
> > > > with external systems, which are very likely to have passwords stored
> > in
> > > > Vault, KeyWhiz or some other external system.
> > > >
> > > > The KIP says that "Vault is very popular and has been described as
> 'the
> > > > current gold standard in secret management and provisioning'."  I
> think
> > > > this might be a bit too much detail -- we don't really need to pick
> > > > favorites, right? :)
> > > >
> > > > I think we should make configuration consistent between the broker
> and
> > > > Connect.  If people can use constructs like
> > > jdbc.config.key="${vault:jdbc.user}${vault:jdbc.password}"
> > > > in Connect, they'll want to do it on the broker too, in a consistent
> > way.
> > > >
> > > > If I understand correctly, ConfigProvider represents an external
> > > > configuration source, such as VaultConfigProvider,
> > KeyWhizConfigProvider,
> > > > etc.
> > > >
> > > > I think we should make the substitution part of the generic
> > configuration
> > > > code, rather than specific to individual ConfigProviders.  We don't
> > > really
> > > > want it to work differently for Vault vs. KeyWhiz vs. AWS secrets,
> etc.
> > > etc.
> > > >
> > > > We should also spell out exactly how substitution works.  For
> example,
> > is
> > > > substitution limited to 1 level deep?  In other words, If I have
> > > > foo="${bar}" and bar=${baz}, probably foo should just be set equal to
> > > > "${baz}" rather than chasing more than one level of indirection.
> > > >
> > > > We should also spell out how this interacts with KIP-226
> > configurations.
> > > > I would suggest that KIP-226 variables not be subjected to
> > substitution.
> > > > The reason is because in theory substitution could lead to different
> > > > results on different brokers, since the different brokers may not
> have
> > > the
> > > > same ConfigProviders configured.  Also, having substitutions in the
> > > KIP-226
> > > > configuration makes it more difficult for the admin to understand
> what
> > > the
> > > > centrally managed configuration is.
> > > >
> > > > It seems the main goal is the ability to load a batch of key/value
> > pairs
> > > > from the ConfigProvider, and the ability to subscribe to
> notifications
> > > > about changes to certain parameters.  Maybe a good generic interface
> > > would
> > > > be like this:
> > > >
> > > >  > public interface ConfigProvider extends Closeable {
> > > > >      // batched get is potentially more efficient
> > > >  >     Map<String, String> get(Collection<String> keys);
> > > > >
> > > > >    // The ConfigProvider is responsible for making this callback
> > > > whenever the key changes.
> > > > >    // Some ConfigProviders may want to have a background thread
> with
> > a
> > > > configurable update interval.
> > > >  >     void subscribe(String key, ConfigurationChangeCallback
> > callback);
> > > > >
> > > > >        // Inverse of subscribe
> > > >  >     void unsubscribe(String key);
> > > > >
> > > > >    // Close all subscriptions and clean up all resources
> > > >  >     void close();
> > > >  > }
> > > >  >
> > > >  > interface ConfigurationChangeCallback {
> > > >  >     void onChange(String key, String value);
> > > >  > }
> > > >
> > > > With regard to ConfigTransformer: do we need to include all this code
> > in
> > > > the KIP?  Seems like an implementation detail.
> > > >
> > > > > Other connectors such as the S3 connector are tightly coupled with
> a
> > > > particular secret manager, and may
> > > > > wish to handle rotation on their own.
> > > >
> > > > Is there a way to avoid this couping?  Seems like some users might
> want
> > > to
> > > > use their own secret manager here.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Wed, May 9, 2018, at 16:32, Robert Yokota wrote:
> > > > > Hi Magesh,
> > > > >
> > > > > I updated the KIP with a link to a PR for a working prototype.  The
> > > > > prototype does not yet use the Connect plugin machinery for class
> > > loader
> > > > > isolation, but should give you an idea of what the final
> > implementation
> > > > > will look like.  Here is the link:
> > > > > https://github.com/apache/kafka/pull/4990/files.
> > > > >
> > > > > I also added an example of a FileConfigProvider to the KIP.
> > > > >
> > > > > Thanks,
> > > > > Robert
> > > > >
> > > > > On Wed, May 9, 2018 at 10:04 AM, Robert Yokota <rayok...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Magesh,
> > > > > >
> > > > > > Thanks for the feedback!
> > > > > >
> > > > > > I will put together a PR to demonstrate what the implementation
> > might
> > > > look
> > > > > > like, as well as a reference FileConfigProvider.
> > > > > >
> > > > > > 1.  The delayMs for a (potentially) scheduled reload is
> determined
> > by
> > > > the
> > > > > > ConfigProvider.  For example, a (hypothetical)
> VaultConfigProvider,
> > > > upon
> > > > > > contacting Vault for a particular secret, might also obtain a
> lease
> > > > > > duration indicating that the secret expires in 1 hour. The
> > > > > > VaultConfigProvider could then call scheduleConfigReload with
> > delayMs
> > > > set
> > > > > > to 3600000ms (1 hour).  This would cause the Connector to restart
> > in
> > > an
> > > > > > hour, forcing it to reload the configs and re-resolve all
> indirect
> > > > > > references.
> > > > > >
> > > > > > 2. Yes, the start() methods in SourceTask and SinkTask would get
> > the
> > > > > > configs with all the indirect references resolved.   Those
> config()
> > > > methods
> > > > > > are for Connectors that want to get the latest configs (with all
> > > > indirect
> > > > > > references re-resolved) at some time after start().  For example,
> > if
> > > a
> > > > Task
> > > > > > encountered some security exception because a secret expired, it
> > > could
> > > > call
> > > > > > config() to get the config with the latest values.  This is
> > assuming
> > > > that
> > > > > > the Task can gracefully recover from the security exception.
> > > > > >
> > > > > > 3. Yes, that is up to the ConfigProvider implementation and is
> out
> > of
> > > > > > scope.  If the ConfigProvider also needs some kind of secrets or
> > > other
> > > > > > data, it could possibly be passed in through the param properties
> > > > > > ("config.providers.vault.param.auth=/run/myauth").  For example
> > > Docker
> > > > > > might generate the auth info for Vault in an in-memory tmpfs file
> > > that
> > > > > > could then be passed as a param.
> > > > > >
> > > > > > Thanks,
> > > > > > Robert
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, May 8, 2018 at 10:10 PM, Magesh Nandakumar <
> > > > mage...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Robert,
> > > > > >>
> > > > > >> Thanks for the KIP. I think, this will be a great addition to
> the
> > > > > >> framework. I think, will be great if the KIP can elaborate a
> > little
> > > > bit
> > > > > >> more on how implementations would look like with an example.
> > > > > >> Also, would be good to provide a reference implementation as
> well.
> > > > > >>
> > > > > >> The other questions I had were
> > > > > >>
> > > > > >> 1.  How would the framework get the delayMs for void
> > > > scheduleConfigReload(
> > > > > >> long delayMs);
> > > > > >> 2. Would the start methods in SourceTask and SinkTask get the
> > > configs
> > > > with
> > > > > >> all the indirect references resolved. If so, trying to
> understand
> > > > > >> the intent of the config() in SourceTaskContext and the
> > > > SinkTaskContext
> > > > > >> 3. What if the provider itself needs some kind of secrets to be
> > > > configured
> > > > > >> to connect to it? I assume that's out of scope for this proposal
> > but
> > > > > >> wanted
> > > > > >> to clarify it.
> > > > > >>
> > > > > >> Thanks
> > > > > >> Magesh
> > > > > >>
> > > > > >> On Tue, May 8, 2018 at 1:52 PM, Robert Yokota <
> rayok...@gmail.com
> > >
> > > > wrote:
> > > > > >>
> > > > > >> > Hi,
> > > > > >> >
> > > > > >> > I would like to start a discussion for KIP-297 to externalize
> > > > secrets
> > > > > >> from
> > > > > >> > Kafka Connect configurations.  Any feedback is appreciated.
> > > > > >> > <
> > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> > 297%3A+Externalizing+Secrets+for+Connect+Configurations
> > > > > >> > >
> > > > > >> >
> > > > > >> > JIRA: <https://issues.apache.org/jira/browse/KAFKA-6886>
> > > > > >> >
> > > > > >> > Thanks in advance,
> > > > > >> > Robert
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > >
> > >
> >
>

Reply via email to