Hi Jan, Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions. This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) Can you please verify that the module.yaml is present at runtime?
Kind regards, Igal. On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hi, > > based on Gordons excellent advice on how to handle JSON messages with > remote functions > (https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was > able to: > > 1) Deserialize JSON Messages from a Kafka Stream > > 2) Route the message to an embedded StatefulFunction > > 3) Serialize the Resulting Protobuf Message to JSON and write it back to > a Kafka Stream > > Now, instead of 2), I would like to route the message to a remote > function, handle it there and write it back to the Stream as json via > the serializer defined in Java. From my understanding all this should > work through address based communication within the Stateful Functions > Application. Unfortunately I don't get it to work. See the relevant code > and error message below. The rest of the project structure basically > follows the walkthrough example from the documentation. Any ideas or > input would be greatly appreciated. > > > module.yaml: > > -------------------------------- > > ... > > spec: > functions: > - function: > meta: > kind: http > type: demo/eventCounterPython > spec: > endpoint: http://python-worker:8000/statefun > states: > - name: count > maxNumBatchRequests: 500 > > ... > > --------------------------------------- > > > EventIO.java > > --------------------------------------------- > > final class EventIO { > > static final EgressIdentifier<GlobalEventCount> > EVENT_COUNT_PYTHON_EGRESS_ID = > new EgressIdentifier<>("demo", "eventCountPython", > GlobalEventCount.class); > > .... > > EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() { > return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID) > .withKafkaAddress(kafkaAddress) > .withSerializer(GlobalEventCountPythonKafkaSerializer.class) > .build(); > } > > .... > > private static final class GlobalEventCountPythonKafkaSerializer > implements KafkaEgressSerializer<GlobalEventCount> { > > private static final long serialVersionUID = 1L; > > @Override > public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount > eventCount) { > byte[] key = null; > byte[] value = null; > > try { > String json = JsonFormat > .printer() > .includingDefaultValueFields() > .print(eventCount); > value = json.getBytes(); > } catch (InvalidProtocolBufferException e) { > e.printStackTrace(); > } > > return new ProducerRecord<>("eventCountPython", key, value); > } > } > > } > > -------------------------------------------- > > > EventModule.java: > > -------------------------------------- > > public final class EventModule implements StatefulFunctionModule { > > @Override > public void configure(Map<String, String> globalConfiguration, > Binder binder) { > > EventIO ioModule = new EventIO("kafka:9092"); > > binder.bindIngress(ioModule.getIngressSpec()); > > binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new > EventRouter()); > > binder.bindEgress(ioModule.getEventCountJavaEgressSpec()); > binder.bindEgress(ioModule.getEventCountPythonEgressSpec()); > > binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused > -> new EventCountJavaStatefulFunction()); > } > } > > ------------------------------------------ > > > EventRouter.java > > ------------------------------------------- > > final class EventRouter implements Router<Event> { > > static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new > FunctionType("demo", "eventCounterPython"); > static final FunctionType JAVA_EVENT_COUNTER_TYPE = new > FunctionType("demo", "eventCounterJava"); > > @Override > public void route(Event event, Downstream<Event> downstream) { > downstream.forward( > JAVA_EVENT_COUNTER_TYPE, > "count", > event) > ; > downstream.forward( > new Address( > PYTHON_EVENT_COUNTER_TYPE, > "count" > ), > event > ); > } > } > > ------------------------------------------ > > > worker.py > > ------------------------------------------- > > @functions.bind("demo/eventCounterPython") > def handle_event(context, _): > state = context.state('count').unpack(GlobalEventCount) > if not state: > state = GlobalEventCount() > state.value = 1 > else: > state.value += 1 > context.state('count').pack(state) > > egress_message = kafka_egress_record(topic="eventCountPython", > value=state) > context.pack_and_send_egress("demo/eventCountPython", egress_message) > > ------------------------------------------ > > > ERROR MESSAGE > > ------------------------------------------ > > worker_1 | 2020-07-02 14:38:35,436 INFO > org.apache.flink.runtime.taskmanager.Task - > feedback-union -> functions -> (Sink: demo-eventCountPython-egress, > Sink: demo-eventCountJava-egress) (1/1) > (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED. > worker_1 | java.lang.IllegalArgumentException: Unknown provider > for type FunctionType(demo, eventCounterPython) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130) > worker_1 | at > > org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) > worker_1 | at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > worker_1 | at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > worker_1 | at > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174) > worker_1 | at > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > worker_1 | at > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > worker_1 | at > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > worker_1 | at > org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) > worker_1 | at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) > worker_1 | at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > worker_1 | at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533 > > ------------------------------------------ > > > Best regards and thanks for taking the time to read all this, > > Jan > >