Thanks for the update, Robert. Looks good to me. Regards,
Rajini On Wed, May 16, 2018 at 4:43 PM, Robert Yokota <rayok...@gmail.com> wrote: > Hi Rajini, > > Thanks for the excellent feedback! > > I've made the API changes that you've requested in the KIP. > > > > 1. Are we expecting one provider instance with different contexts > > provided to `ConfigProvider.get()`? If we created a different provider > > instance for each context, we could deal with scheduling reloads in the > > provider implementation? > > Yes, there would be one provider instance. I've collapsed the > ConfigContext and the ConfigChangeCallback by adding a parameter delayMs to > indicate when the change will happen. When a particular ConfigProvider > retrieves a lease duration along with a key, it can either 1) schedule a > background thread to push out the change when it happens (at which time the > delayMs will be 0), or invoke the callback immediately with the lease > duration set as delayMs (of course, in this case the values for the keys > will be the old values). A ConfProvider could be parameterized to do one > or the other. > > > > 2. Couldn't ConfigData be an interface that just returns a map of > > key-value pairs. Providers that return metadata could extend it to > provide > > metadata in a meaningful format instead of Map<String, String>. > > I've replaced ConfigData with Map<String, String> as you suggested. > > > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get all > > keys in the path. Do we have two get() methods since some providers need > > keys to be specified and some don't? How do we decide which one to use? > > The ConfigProvider should be thought of like a Map interface and does not > require that one signature of get() be preferred over the other. KIP-226 > can use get(String path) while Connect will use get(String path, > Set<String>) since it knows which keys it is interested in. > > > A few more updates to the KIP: > > - I've elided the ConfigTransformer implementation as Colin suggested. > - The variable reference now looks like ${provider:[path:]key} where the > path is optional. > > > Thanks! > Robert > > > > > On Wed, May 16, 2018 at 4:30 AM, Rajini Sivaram <rajinisiva...@gmail.com> > wrote: > > > Hi Robert, > > > > Thanks for the KIP updates. > > > > The interfaces look suitable for brokers, with some small changes. If we > > can adapt the interface to implement the existing DynamicBrokerConfig, > then > > we are good. > > > > With broker configs: > > > > 1. We don't know what configs are in ZK since we allow custom configs. > > So we would use `ConfigProvider.get()` without specifying keys. > > 2. We want to see all changes (i.e. changes under a path). We can deal > > with this internally by ignoring `keys` and subscribing to everything > > 3. We have two paths (one for per-broker config and another for > default > > config shared by all brokers). All methods should ideally provide > path - > > see changes suggested below. > > 4. Keys are not independent. We update in batches (e.g keystore + > > password). We want to see batches of changes, not individual changes. > We > > retrieve all values from a path when a change is detected. We can do > > this > > by ignoring values from the callback, but it would be better if the > > callback interface could be changed - see below. > > > > > > public interface ConfigProvider extends Configurable, Closeable { > > > > *//** KIP-226 will use this* > > ConfigData get(ConfigContext ctx, String path); > > > > *// **KIP-226 will never use this, we don't know what keys are in ZK > > since we allow custom configs* > > ConfigData get(ConfigContext ctx, String path, Set<String> keys); > > > > * // KIP-226 will ignore `key` and subscribe to all changes.* > > * // But based on the above method, this should perhaps be:* > > * // subscribe(String path, Set<String> keys, > > ConfigurationChangeCallback callback)?* > > void subscribe(String key, ConfigurationChangeCallback callback); > > > > *<== As above, un**subscribe(String path, Set<String> keys)**?* > > void unsubscribe(String key); > > } > > > > public interface ConfigurationChangeCallback { > > *// **For brokers, we want to process all updated keys in a single > > callback. P**erhaps this could be: * > > > > * // onChange(String path, Map<String, String> values)?* > > > > void onChange(String key, String value); > > } > > > > A few other questions (I read your response to Colin, but still didn't > get > > it. Could be because I am not familiar with the interfaces required for > > vaults, sorry): > > > > 1. Are we expecting one provider instance with different contexts > > provided to `ConfigProvider.get()`? If we created a different provider > > instance for each context, we could deal with scheduling reloads in > the > > provider implementation? > > 2. Couldn't ConfigData be an interface that just returns a map of > > key-value pairs. Providers that return metadata could extend it to > > provide > > metadata in a meaningful format instead of Map<String, String>. > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get all > > keys in the path. Do we have two get() methods since some providers > need > > keys to be specified and some don't? How do we decide which one to > use? > > > > Thanks, > > > > Rajini > > > > > > On Wed, May 16, 2018 at 2:40 AM, Robert Yokota <rayok...@gmail.com> > wrote: > > > > > Thanks, Ron! I will take a look. > > > > > > Regards, > > > Robert > > > > > > On Tue, May 15, 2018 at 5:59 PM, Ron Dagostino <rndg...@gmail.com> > > wrote: > > > > > > > Hi Robert. Regarding your comment "use the lease duration to > schedule > > a > > > > configuration reload in the future", you might be interested in the > > code > > > > that does refresh for OAuth Bearer Tokens in KIP-255; specifically, > the > > > > class > > > > org.apache.kafka.common.security.oauthbearer.internal.expiring. > > > > ExpiringCredentialRefreshingLogin. > > > > The class performs JAAS logins/relogins based on the expiration time > > of a > > > > retrieved expiring credential. The implementation of that class is > > > > inspired by the code that currently does refresh for Kerberos tickets > > but > > > > is more reusable. I don't know if you will leverage JAAS for > defining > > > how > > > > to go get a credential (you could since you have to provide > credentials > > > to > > > > authenticate to the remote systems anyway), but regardless, that > class > > > > should be useful at least in some minimal sense if not more than > that. > > > See > > > > https://github.com/apache/kafka/pull/4994. > > > > > > > > Ron > > > > > > > > Ron > > > > > > > > On Tue, May 15, 2018 at 8:01 PM, Robert Yokota <rayok...@gmail.com> > > > wrote: > > > > > > > > > Hi Colin, > > > > > > > > > > Thanks for the feedback! > > > > > > > > > > > > > > > > The KIP says that "Vault is very popular and has been described > as > > > 'the > > > > > current gold standard in secret management and provisioning'." I > > think > > > > > this might be a bit too much detail -- we don't really need to > > > > > > favorites, right? :) > > > > > > > > > > I've removed this line :) > > > > > > > > > > > > > > > > I think we should make the substitution part of the generic > > > > configuration > > > > > code, rather than specific to individual ConfigProviders. We don't > > > > really > > > > > want it to work differently for Vault vs. KeyWhiz vs. > > > > > > AWS secrets, etc. etc. > > > > > > > > > > Yes, the ConfigProviders merely serve up key-value pairs. A helper > > > class > > > > > like ConfigTransformer would perform the variable substitutions if > > > > desired. > > > > > > > > > > > > > > > > We should also spell out exactly how substitution works. > > > > > > > > > > By one-level of indirection I just meant a simple replacement of > > > > variables > > > > > (which are the indirect references). So if you have foo=${bar} and > > > > > bar=${baz} and your file contains bar=hello, baz=world, then the > > final > > > > > result would be foo=hello and bar=world. I've added this example > to > > > the > > > > > KIP. > > > > > > > > > > You can see this as the DEFAULT_PATTERN in the ConfigTransformer. > > The > > > > > ConfigTransformer only provides one level of indirection. > > > > > > > > > > > > > > > > We should also spell out how this interacts with KIP-226 > > > > configurations. > > > > > > > > > > Yes, I mention at the beginning that KIP-226 could use the > > > ConfigProvider > > > > > but not the ConfigTransformer. > > > > > > > > > > > > > > > > Maybe a good generic interface would be like this: > > > > > > > > > > I've added the subscription APIs but would like to keep the other > > APIs > > > > as I > > > > > will need them for integration with Vault. With Vault I obtain the > > > lease > > > > > duration at the time the key is obtained, so at that time I would > > want > > > to > > > > > use the lease duration to schedule a configuration reload in the > > > future. > > > > > This is similar to how the integration between Vault and the Spring > > > > > Framework works. Also, the lease duration would be included in > the > > > > > metadata map vs. the data map. Finally, I need an additional > "path" > > or > > > > > "bucket" parameter, which is used by Vault to indicate which set of > > > > > key-values are to be retrieved. > > > > > > > > > > > > > > > > With regard to ConfigTransformer: do we need to include all this > > code > > > > in > > > > > the KIP? Seems like an implementation detail. > > > > > > > > > > I use the ConfigTransformer to show how the pattern ${provider:key} > > is > > > > > defined and how the substitution only involves one level of > > > indirection. > > > > > If you feel it does not add anything to the text, I can remove it. > > > > > > > > > > > > > > > > Is there a way to avoid this couping? > > > > > > > > > > I'd have to look into it and get back to you. However, I assume > that > > > the > > > > > answer is not relevant for this KIP :) > > > > > > > > > > > > > > > Thanks, > > > > > Robert > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 15, 2018 at 4:04 PM, Colin McCabe <cmcc...@apache.org> > > > > wrote: > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > Thanks for posting this. In the past we've been kind of > reluctant > > to > > > > add > > > > > > more complexity to configuration. I think Connect does have a > > clear > > > > need > > > > > > for this kind of functionality, though. As you mention, Connect > > > > > integrates > > > > > > with external systems, which are very likely to have passwords > > stored > > > > in > > > > > > Vault, KeyWhiz or some other external system. > > > > > > > > > > > > The KIP says that "Vault is very popular and has been described > as > > > 'the > > > > > > current gold standard in secret management and provisioning'." I > > > think > > > > > > this might be a bit too much detail -- we don't really need to > pick > > > > > > favorites, right? :) > > > > > > > > > > > > I think we should make configuration consistent between the > broker > > > and > > > > > > Connect. If people can use constructs like > > > > > jdbc.config.key="${vault:jdbc.user}${vault:jdbc.password}" > > > > > > in Connect, they'll want to do it on the broker too, in a > > consistent > > > > way. > > > > > > > > > > > > If I understand correctly, ConfigProvider represents an external > > > > > > configuration source, such as VaultConfigProvider, > > > > KeyWhizConfigProvider, > > > > > > etc. > > > > > > > > > > > > I think we should make the substitution part of the generic > > > > configuration > > > > > > code, rather than specific to individual ConfigProviders. We > don't > > > > > really > > > > > > want it to work differently for Vault vs. KeyWhiz vs. AWS > secrets, > > > etc. > > > > > etc. > > > > > > > > > > > > We should also spell out exactly how substitution works. For > > > example, > > > > is > > > > > > substitution limited to 1 level deep? In other words, If I have > > > > > > foo="${bar}" and bar=${baz}, probably foo should just be set > equal > > to > > > > > > "${baz}" rather than chasing more than one level of indirection. > > > > > > > > > > > > We should also spell out how this interacts with KIP-226 > > > > configurations. > > > > > > I would suggest that KIP-226 variables not be subjected to > > > > substitution. > > > > > > The reason is because in theory substitution could lead to > > different > > > > > > results on different brokers, since the different brokers may not > > > have > > > > > the > > > > > > same ConfigProviders configured. Also, having substitutions in > the > > > > > KIP-226 > > > > > > configuration makes it more difficult for the admin to understand > > > what > > > > > the > > > > > > centrally managed configuration is. > > > > > > > > > > > > It seems the main goal is the ability to load a batch of > key/value > > > > pairs > > > > > > from the ConfigProvider, and the ability to subscribe to > > > notifications > > > > > > about changes to certain parameters. Maybe a good generic > > interface > > > > > would > > > > > > be like this: > > > > > > > > > > > > > public interface ConfigProvider extends Closeable { > > > > > > > // batched get is potentially more efficient > > > > > > > Map<String, String> get(Collection<String> keys); > > > > > > > > > > > > > > // The ConfigProvider is responsible for making this > callback > > > > > > whenever the key changes. > > > > > > > // Some ConfigProviders may want to have a background thread > > > with > > > > a > > > > > > configurable update interval. > > > > > > > void subscribe(String key, ConfigurationChangeCallback > > > > callback); > > > > > > > > > > > > > > // Inverse of subscribe > > > > > > > void unsubscribe(String key); > > > > > > > > > > > > > > // Close all subscriptions and clean up all resources > > > > > > > void close(); > > > > > > > } > > > > > > > > > > > > > > interface ConfigurationChangeCallback { > > > > > > > void onChange(String key, String value); > > > > > > > } > > > > > > > > > > > > With regard to ConfigTransformer: do we need to include all this > > code > > > > in > > > > > > the KIP? Seems like an implementation detail. > > > > > > > > > > > > > Other connectors such as the S3 connector are tightly coupled > > with > > > a > > > > > > particular secret manager, and may > > > > > > > wish to handle rotation on their own. > > > > > > > > > > > > Is there a way to avoid this couping? Seems like some users > might > > > want > > > > > to > > > > > > use their own secret manager here. > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > > > > > > > On Wed, May 9, 2018, at 16:32, Robert Yokota wrote: > > > > > > > Hi Magesh, > > > > > > > > > > > > > > I updated the KIP with a link to a PR for a working prototype. > > The > > > > > > > prototype does not yet use the Connect plugin machinery for > class > > > > > loader > > > > > > > isolation, but should give you an idea of what the final > > > > implementation > > > > > > > will look like. Here is the link: > > > > > > > https://github.com/apache/kafka/pull/4990/files. > > > > > > > > > > > > > > I also added an example of a FileConfigProvider to the KIP. > > > > > > > > > > > > > > Thanks, > > > > > > > Robert > > > > > > > > > > > > > > On Wed, May 9, 2018 at 10:04 AM, Robert Yokota < > > rayok...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Magesh, > > > > > > > > > > > > > > > > Thanks for the feedback! > > > > > > > > > > > > > > > > I will put together a PR to demonstrate what the > implementation > > > > might > > > > > > look > > > > > > > > like, as well as a reference FileConfigProvider. > > > > > > > > > > > > > > > > 1. The delayMs for a (potentially) scheduled reload is > > > determined > > > > by > > > > > > the > > > > > > > > ConfigProvider. For example, a (hypothetical) > > > VaultConfigProvider, > > > > > > upon > > > > > > > > contacting Vault for a particular secret, might also obtain a > > > lease > > > > > > > > duration indicating that the secret expires in 1 hour. The > > > > > > > > VaultConfigProvider could then call scheduleConfigReload with > > > > delayMs > > > > > > set > > > > > > > > to 3600000ms (1 hour). This would cause the Connector to > > restart > > > > in > > > > > an > > > > > > > > hour, forcing it to reload the configs and re-resolve all > > > indirect > > > > > > > > references. > > > > > > > > > > > > > > > > 2. Yes, the start() methods in SourceTask and SinkTask would > > get > > > > the > > > > > > > > configs with all the indirect references resolved. Those > > > config() > > > > > > methods > > > > > > > > are for Connectors that want to get the latest configs (with > > all > > > > > > indirect > > > > > > > > references re-resolved) at some time after start(). For > > example, > > > > if > > > > > a > > > > > > Task > > > > > > > > encountered some security exception because a secret expired, > > it > > > > > could > > > > > > call > > > > > > > > config() to get the config with the latest values. This is > > > > assuming > > > > > > that > > > > > > > > the Task can gracefully recover from the security exception. > > > > > > > > > > > > > > > > 3. Yes, that is up to the ConfigProvider implementation and > is > > > out > > > > of > > > > > > > > scope. If the ConfigProvider also needs some kind of secrets > > or > > > > > other > > > > > > > > data, it could possibly be passed in through the param > > properties > > > > > > > > ("config.providers.vault.param.auth=/run/myauth"). For > > example > > > > > Docker > > > > > > > > might generate the auth info for Vault in an in-memory tmpfs > > file > > > > > that > > > > > > > > could then be passed as a param. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Robert > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 8, 2018 at 10:10 PM, Magesh Nandakumar < > > > > > > mage...@confluent.io> > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hi Robert, > > > > > > > >> > > > > > > > >> Thanks for the KIP. I think, this will be a great addition > to > > > the > > > > > > > >> framework. I think, will be great if the KIP can elaborate a > > > > little > > > > > > bit > > > > > > > >> more on how implementations would look like with an example. > > > > > > > >> Also, would be good to provide a reference implementation as > > > well. > > > > > > > >> > > > > > > > >> The other questions I had were > > > > > > > >> > > > > > > > >> 1. How would the framework get the delayMs for void > > > > > > scheduleConfigReload( > > > > > > > >> long delayMs); > > > > > > > >> 2. Would the start methods in SourceTask and SinkTask get > the > > > > > configs > > > > > > with > > > > > > > >> all the indirect references resolved. If so, trying to > > > understand > > > > > > > >> the intent of the config() in SourceTaskContext and the > > > > > > SinkTaskContext > > > > > > > >> 3. What if the provider itself needs some kind of secrets to > > be > > > > > > configured > > > > > > > >> to connect to it? I assume that's out of scope for this > > proposal > > > > but > > > > > > > >> wanted > > > > > > > >> to clarify it. > > > > > > > >> > > > > > > > >> Thanks > > > > > > > >> Magesh > > > > > > > >> > > > > > > > >> On Tue, May 8, 2018 at 1:52 PM, Robert Yokota < > > > rayok...@gmail.com > > > > > > > > > > > wrote: > > > > > > > >> > > > > > > > >> > Hi, > > > > > > > >> > > > > > > > > >> > I would like to start a discussion for KIP-297 to > > externalize > > > > > > secrets > > > > > > > >> from > > > > > > > >> > Kafka Connect configurations. Any feedback is > appreciated. > > > > > > > >> > < > > > > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > > >> > 297%3A+Externalizing+Secrets+for+Connect+Configurations > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > JIRA: <https://issues.apache.org/jira/browse/KAFKA-6886> > > > > > > > >> > > > > > > > > >> > Thanks in advance, > > > > > > > >> > Robert > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >