Hi Hezekiah, I've confirmed that the Kafka properties set in the module specification file (module.yaml) are indeed correctly being parsed and used to construct the internal Kafka clients. StateFun / Flink does not alter or modify the properties.
So, this should be something wrong with your property settings, and causing the Kafka client itself to not pick up the `sasl.jaas.config` property value. >From the resolved producer config in the logs, it looks like your `sasl.jaas.config` is null, but all other properties are being picked up correctly. Please check your properties again, and make sure their keys are correct and values conform to the JAAS config formats. For starters, there's a typo in your `sasl.mechanism` config, you've mis-typed an extra 's'. I've verified that the following properties will work, with SASL JAAS config being picked up correctly: ``` egresses: - egress: meta: type: statefun.kafka.io/generic-egress id: example/greets spec: address: <confluent-bootstrap-server> deliverySemantic: type: exactly-once transactionTimeoutMillis: 100000 properties: - security.protocol: SASL_SSL - sasl.mechanism: PLAIN - sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; - ssl.endpoint.identification.algorithm: https ``` Cheers, Gordon On Wed, Oct 7, 2020 at 11:36 PM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal > in who might be able to help you with this problem. > > Cheers, > Till > > On Wed, Oct 7, 2020 at 3:56 PM hezekiah maina <hezekiahmai...@gmail.com> > wrote: > >> Hi, >> >> I'm trying to use Stateful Functions with Kafka as my ingress and egress. >> I'm using the Confluent fully-managed Kafka and I'm having a challenge >> adding my authentication details in the module.yaml file. >> Here is my current config details: >> version: "1.0" >> module: >> meta: >> type: remote >> spec: >> functions: >> - function: >> meta: >> kind: http >> type: example/greeter >> spec: >> endpoint: <https-endpoint> >> states: >> - seen_count >> maxNumBatchRequests: 500 >> timeout: 2min >> ingresses: >> - ingress: >> meta: >> type: statefun.kafka.io/routable-protobuf-ingress >> id: example/names >> spec: >> address: <confluent-bootstrap-server> >> consumerGroupId: statefun-consumer-group >> topics: >> - topic: names >> typeUrl: com.googleapis/example.GreetRequest >> targets: >> - example/greeter >> properties: >> - bootstrap.servers:<confluent-bootstrap-server> >> - security.protocol: SASL_SSL >> - sasl.mechanism: PLAIN >> - sasl.jaas.config: >> org.apache.kafka.common.security.plain.PlainLoginModule required >> username="USERNAME" password="PASSWORD"; >> - ssl.endpoint.identification.algorithm: https >> egresses: >> - egress: >> meta: >> type: statefun.kafka.io/generic-egress >> id: example/greets >> spec: >> address: <confluent-bootstrap-server> >> deliverySemantic: >> type: exactly-once >> transactionTimeoutMillis: 100000 >> properties: >> - bootstrap.servers: <confluent-bootstrap-server> >> - security.protocol: SASL_SSL >> - sasl.mechanisms: PLAIN >> - sasl.jaas.config: >> org.apache.kafka.common.security.plain.PlainLoginModule required >> username="USERNAME" password="PASSWORD"; >> - ssl.endpoint.identification.algorithm: https >> >> After running docker-compose with a master and worker containers I'm >> getting this error: >> Could not find a 'KafkaClient' entry in the JAAS configuration. System >> property 'java.security.auth.login.config' is >> /tmp/jaas-2846080966990890307.conf >> >> The producer config logged : >> worker_1 | 2020-10-07 13:38:08,489 INFO >> org.apache.kafka.clients.producer.ProducerConfig - >> ProducerConfig values: >> worker_1 | acks = 1 >> worker_1 | batch.size = 16384 >> worker_1 | bootstrap.servers = [https:// >> ---.asia-southeast1.gcp.confluent.cloud:9092] >> worker_1 | buffer.memory = 33554432 >> worker_1 | client.dns.lookup = default >> worker_1 | client.id = >> worker_1 | compression.type = none >> worker_1 | connections.max.idle.ms = 540000 >> worker_1 | delivery.timeout.ms = 120000 >> worker_1 | enable.idempotence = false >> worker_1 | interceptor.classes = [] >> worker_1 | key.serializer = class >> org.apache.kafka.common.serialization.ByteArraySerializer >> worker_1 | linger.ms = 0 >> worker_1 | max.block.ms = 60000 >> worker_1 | max.in.flight.requests.per.connection = 5 >> worker_1 | max.request.size = 1048576 >> worker_1 | metadata.max.age.ms = 300000 >> worker_1 | metric.reporters = [] >> worker_1 | metrics.num.samples = 2 >> worker_1 | metrics.recording.level = INFO >> worker_1 | metrics.sample.window.ms = 30000 >> worker_1 | partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> worker_1 | receive.buffer.bytes = 32768 >> worker_1 | reconnect.backoff.max.ms = 1000 >> worker_1 | reconnect.backoff.ms = 50 >> worker_1 | request.timeout.ms = 30000 >> worker_1 | retries = 2147483647 >> worker_1 | retry.backoff.ms = 100 >> worker_1 | sasl.client.callback.handler.class = null >> worker_1 | sasl.jaas.config = null >> worker_1 | sasl.kerberos.kinit.cmd = /usr/bin/kinit >> worker_1 | sasl.kerberos.min.time.before.relogin = 60000 >> worker_1 | sasl.kerberos.service.name = null >> worker_1 | sasl.kerberos.ticket.renew.jitter = 0.05 >> worker_1 | sasl.kerberos.ticket.renew.window.factor = 0.8 >> worker_1 | sasl.login.callback.handler.class = null >> worker_1 | sasl.login.class = null >> worker_1 | sasl.login.refresh.buffer.seconds = 300 >> worker_1 | sasl.login.refresh.min.period.seconds = 60 >> worker_1 | sasl.login.refresh.window.factor = 0.8 >> worker_1 | sasl.login.refresh.window.jitter = 0.05 >> worker_1 | sasl.mechanism = GSSAPI >> worker_1 | security.protocol = SASL_SSL >> worker_1 | send.buffer.bytes = 131072 >> worker_1 | ssl.cipher.suites = null >> worker_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> worker_1 | ssl.endpoint.identification.algorithm = https >> worker_1 | ssl.key.password = null >> worker_1 | ssl.keymanager.algorithm = SunX509 >> worker_1 | ssl.keystore.location = null >> worker_1 | ssl.keystore.password = null >> worker_1 | ssl.keystore.type = JKS >> worker_1 | ssl.protocol = TLS >> worker_1 | ssl.provider = null >> worker_1 | ssl.secure.random.implementation = null >> worker_1 | ssl.trustmanager.algorithm = PKIX >> worker_1 | ssl.truststore.location = null >> worker_1 | ssl.truststore.password = null >> worker_1 | ssl.truststore.type = JKS >> worker_1 | transaction.timeout.ms = 100000 >> worker_1 | transactional.id = null >> worker_1 | value.serializer = class >> org.apache.kafka.common.serialization.ByteArraySerializer >> worker_1 | >> >> Is there something that I'm missing? >> >