Hello, I believe that your assembly plugin configuration doesn't merge files under META-INF/services. Can you unzip your jar and examin manually the contents of: META-INF/services/org.apache.flink.statefun.flink.io.spi.FlinkIoModule
It should include at least the following lines: org.apache.flink.statefun.flink.io.datastream.SourceSinkModule org.apache.flink.statefun.flink.io.kafka.KafkaFlinkIoModule org.apache.flink.statefun.flink.io.kinesis.KinesisFlinkIOModule If you are okay with using an alternative plugin, take a look at the maven-shade-plugin, and how we use it to obtain this task [1]. Side note 1: If you can, please use statefun 2.2.2 instead of 2.2.0, as it fixed an important bug. Side note 2: If you _must_ submit your statefun job to an existing cluster, then consider using the DataStream integration API [2] [1] https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-distribution/pom.xml#L178,L180 [2] https://github.com/apache/flink-statefun/blob/d1744eaa888a530edf102396675dfa4377489560/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L45 Good luck! Igal. On Wed, Feb 10, 2021 at 4:47 PM Filip Karnicki <filip.karni...@gmail.com> wrote: > Hi, I modified the Stateful Functions 2.2.0 asyc example to include a real > binding to kafka, I included statefun-flink-distribution and > stateful-kafka-io in the pom and I created a fat jar using the > maven-assembly-plugin, > > and my flink cluster complains about: > > java.lang.IllegalStateException: Unable to find a source translation for > ingress of type IngressType(statefun.kafka.io, universal-ingress), which > is bound for key > IngressIdentifier(org.apache.flink.statefun.examples.async, tasks, class > org.apache.flink.statefun.examples.async.events.TaskStartedEvent) > > org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.sourceFromSpec(IngressToSourceFunctionTranslator.java:45) > > org.apache.flink.statefun.flink.core.common.Maps.transformValues(Maps.java:54) > > org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.translate(IngressToSourceFunctionTranslator.java:37) > > org.apache.flink.statefun.flink.core.translation.Sources.ingressToSourceFunction(Sources.java:117) > > org.apache.flink.statefun.flink.core.translation.Sources.create(Sources.java:52) > > org.apache.flink.statefun.flink.core.translation.FlinkUniverse.configure(FlinkUniverse.java:44) > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:74) > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47) > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.base/java.lang.reflect.Method.invoke(Method.java:566) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 12 more > > > Does anyone have any idea why this wouldn't work on a cluster, yet is > completely fine when I'm using the test harness with a real kafka? > > Many thanks > Fil >