Hi, I'm trying to use Stateful Functions with Kafka as my ingress and egress. I'm using the Confluent fully-managed Kafka and I'm having a challenge adding my authentication details in the module.yaml file. Here is my current config details: version: "1.0" module: meta: type: remote spec: functions: - function: meta: kind: http type: example/greeter spec: endpoint: <https-endpoint> states: - seen_count maxNumBatchRequests: 500 timeout: 2min ingresses: - ingress: meta: type: statefun.kafka.io/routable-protobuf-ingress id: example/names spec: address: <confluent-bootstrap-server> consumerGroupId: statefun-consumer-group topics: - topic: names typeUrl: com.googleapis/example.GreetRequest targets: - example/greeter properties: - bootstrap.servers:<confluent-bootstrap-server> - security.protocol: SASL_SSL - sasl.mechanism: PLAIN - sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; - ssl.endpoint.identification.algorithm: https egresses: - egress: meta: type: statefun.kafka.io/generic-egress id: example/greets spec: address: <confluent-bootstrap-server> deliverySemantic: type: exactly-once transactionTimeoutMillis: 100000 properties: - bootstrap.servers: <confluent-bootstrap-server> - security.protocol: SASL_SSL - sasl.mechanisms: PLAIN - sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; - ssl.endpoint.identification.algorithm: https
After running docker-compose with a master and worker containers I'm getting this error: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-2846080966990890307.conf The producer config logged : worker_1 | 2020-10-07 13:38:08,489 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: worker_1 | acks = 1 worker_1 | batch.size = 16384 worker_1 | bootstrap.servers = [https:// ---.asia-southeast1.gcp.confluent.cloud:9092] worker_1 | buffer.memory = 33554432 worker_1 | client.dns.lookup = default worker_1 | client.id = worker_1 | compression.type = none worker_1 | connections.max.idle.ms = 540000 worker_1 | delivery.timeout.ms = 120000 worker_1 | enable.idempotence = false worker_1 | interceptor.classes = [] worker_1 | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer worker_1 | linger.ms = 0 worker_1 | max.block.ms = 60000 worker_1 | max.in.flight.requests.per.connection = 5 worker_1 | max.request.size = 1048576 worker_1 | metadata.max.age.ms = 300000 worker_1 | metric.reporters = [] worker_1 | metrics.num.samples = 2 worker_1 | metrics.recording.level = INFO worker_1 | metrics.sample.window.ms = 30000 worker_1 | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner worker_1 | receive.buffer.bytes = 32768 worker_1 | reconnect.backoff.max.ms = 1000 worker_1 | reconnect.backoff.ms = 50 worker_1 | request.timeout.ms = 30000 worker_1 | retries = 2147483647 worker_1 | retry.backoff.ms = 100 worker_1 | sasl.client.callback.handler.class = null worker_1 | sasl.jaas.config = null worker_1 | sasl.kerberos.kinit.cmd = /usr/bin/kinit worker_1 | sasl.kerberos.min.time.before.relogin = 60000 worker_1 | sasl.kerberos.service.name = null worker_1 | sasl.kerberos.ticket.renew.jitter = 0.05 worker_1 | sasl.kerberos.ticket.renew.window.factor = 0.8 worker_1 | sasl.login.callback.handler.class = null worker_1 | sasl.login.class = null worker_1 | sasl.login.refresh.buffer.seconds = 300 worker_1 | sasl.login.refresh.min.period.seconds = 60 worker_1 | sasl.login.refresh.window.factor = 0.8 worker_1 | sasl.login.refresh.window.jitter = 0.05 worker_1 | sasl.mechanism = GSSAPI worker_1 | security.protocol = SASL_SSL worker_1 | send.buffer.bytes = 131072 worker_1 | ssl.cipher.suites = null worker_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] worker_1 | ssl.endpoint.identification.algorithm = https worker_1 | ssl.key.password = null worker_1 | ssl.keymanager.algorithm = SunX509 worker_1 | ssl.keystore.location = null worker_1 | ssl.keystore.password = null worker_1 | ssl.keystore.type = JKS worker_1 | ssl.protocol = TLS worker_1 | ssl.provider = null worker_1 | ssl.secure.random.implementation = null worker_1 | ssl.trustmanager.algorithm = PKIX worker_1 | ssl.truststore.location = null worker_1 | ssl.truststore.password = null worker_1 | ssl.truststore.type = JKS worker_1 | transaction.timeout.ms = 100000 worker_1 | transactional.id = null worker_1 | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer worker_1 | Is there something that I'm missing?