Hi Jan,
Judging by the exception message it seems like the function type
"demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was
accidentally excluded from the resulting artifact (Docker image or a
jar-with-dependencies)
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote:

> Hi,
>
> based on Gordons excellent advice on how to handle JSON messages with
> remote functions
> (https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was
> able to:
>
> 1) Deserialize JSON Messages from a Kafka Stream
>
> 2) Route the message to an embedded StatefulFunction
>
> 3) Serialize the Resulting Protobuf Message to JSON and write it back to
> a Kafka Stream
>
> Now, instead of 2), I would like to route the message to a remote
> function, handle it there and write it back to the Stream as json via
> the serializer defined in Java. From my understanding all this should
> work through address based communication within the Stateful Functions
> Application. Unfortunately I don't get it to work. See the relevant code
> and error message below. The rest of the project structure basically
> follows the walkthrough example from the documentation. Any ideas or
> input would be greatly appreciated.
>
>
> module.yaml:
>
> --------------------------------
>
> ...
>
> spec:
>      functions:
>        - function:
>            meta:
>              kind: http
>              type: demo/eventCounterPython
>            spec:
>              endpoint: http://python-worker:8000/statefun
>              states:
>                - name: count
>              maxNumBatchRequests: 500
>
> ...
>
> ---------------------------------------
>
>
> EventIO.java
>
> ---------------------------------------------
>
> final class EventIO {
>
>    static final EgressIdentifier<GlobalEventCount>
> EVENT_COUNT_PYTHON_EGRESS_ID =
>        new EgressIdentifier<>("demo", "eventCountPython",
> GlobalEventCount.class);
>
>      ....
>
>    EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
>      return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
>          .withKafkaAddress(kafkaAddress)
> .withSerializer(GlobalEventCountPythonKafkaSerializer.class)
>          .build();
>    }
>
>    ....
>
>    private static final class GlobalEventCountPythonKafkaSerializer
> implements KafkaEgressSerializer<GlobalEventCount> {
>
>      private static final long serialVersionUID = 1L;
>
>      @Override
>      public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
> eventCount) {
>        byte[] key = null;
>        byte[] value = null;
>
>        try {
>          String json = JsonFormat
>                  .printer()
>                  .includingDefaultValueFields()
>                  .print(eventCount);
>          value = json.getBytes();
>        } catch (InvalidProtocolBufferException e) {
>          e.printStackTrace();
>        }
>
>        return new ProducerRecord<>("eventCountPython", key, value);
>      }
>    }
>
> }
>
> --------------------------------------------
>
>
> EventModule.java:
>
> --------------------------------------
>
> public final class EventModule implements StatefulFunctionModule {
>
>      @Override
>      public void configure(Map<String, String> globalConfiguration,
> Binder binder) {
>
>          EventIO ioModule = new EventIO("kafka:9092");
>
>          binder.bindIngress(ioModule.getIngressSpec());
>
>          binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
> EventRouter());
>
>          binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
> binder.bindEgress(ioModule.getEventCountPythonEgressSpec());
>
> binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
> -> new EventCountJavaStatefulFunction());
>      }
> }
>
> ------------------------------------------
>
>
> EventRouter.java
>
> -------------------------------------------
>
> final class EventRouter implements Router<Event> {
>
>      static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterPython");
>      static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterJava");
>
>      @Override
>      public void route(Event event, Downstream<Event> downstream) {
>          downstream.forward(
>                  JAVA_EVENT_COUNTER_TYPE,
>                  "count",
>                  event)
>          ;
>          downstream.forward(
>                  new Address(
>                          PYTHON_EVENT_COUNTER_TYPE,
>                          "count"
>                  ),
>                  event
>          );
>      }
> }
>
> ------------------------------------------
>
>
> worker.py
>
> -------------------------------------------
>
> @functions.bind("demo/eventCounterPython")
> def handle_event(context, _):
>      state = context.state('count').unpack(GlobalEventCount)
>      if not state:
>          state = GlobalEventCount()
>          state.value = 1
>      else:
>          state.value += 1
>      context.state('count').pack(state)
>
>      egress_message = kafka_egress_record(topic="eventCountPython",
> value=state)
>      context.pack_and_send_egress("demo/eventCountPython", egress_message)
>
> ------------------------------------------
>
>
> ERROR MESSAGE
>
> ------------------------------------------
>
> worker_1         | 2020-07-02 14:38:35,436 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
> Sink: demo-eventCountJava-egress) (1/1)
> (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
> worker_1         | java.lang.IllegalArgumentException: Unknown provider
> for type FunctionType(demo, eventCounterPython)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
> worker_1         |     at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> worker_1         |     at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
> worker_1         |     at
>
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> worker_1         |     at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> worker_1         |     at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> worker_1         |     at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> worker_1         |     at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> worker_1         |     at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> worker_1         |     at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533
>
> ------------------------------------------
>
>
> Best regards and thanks for taking the time to read all this,
>
> Jan
>
>

Reply via email to