Hi,

based on Gordons excellent advice on how to handle JSON messages with remote functions (https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to a Kafka Stream

Now, instead of 2), I would like to route the message to a remote function, handle it there and write it back to the Stream as json via the serializer defined in Java. From my understanding all this should work through address based communication within the Stateful Functions Application. Unfortunately I don't get it to work. See the relevant code and error message below. The rest of the project structure basically follows the walkthrough example from the documentation. Any ideas or input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
    functions:
      - function:
          meta:
            kind: http
            type: demo/eventCounterPython
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - name: count
            maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

  static final EgressIdentifier<GlobalEventCount> EVENT_COUNT_PYTHON_EGRESS_ID =       new EgressIdentifier<>("demo", "eventCountPython", GlobalEventCount.class);

    ....

  EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
    return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
        .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
        .build();
  }

  ....

  private static final class GlobalEventCountPythonKafkaSerializer implements KafkaEgressSerializer<GlobalEventCount> {

    private static final long serialVersionUID = 1L;

    @Override
    public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount eventCount) {
      byte[] key = null;
      byte[] value = null;

      try {
        String json = JsonFormat
                .printer()
                .includingDefaultValueFields()
                .print(eventCount);
        value = json.getBytes();
      } catch (InvalidProtocolBufferException e) {
        e.printStackTrace();
      }

      return new ProducerRecord<>("eventCountPython", key, value);
    }
  }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {

        EventIO ioModule = new EventIO("kafka:9092");

        binder.bindIngress(ioModule.getIngressSpec());

        binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new EventRouter());

        binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused -> new EventCountJavaStatefulFunction());
    }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

    static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new FunctionType("demo", "eventCounterPython");     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new FunctionType("demo", "eventCounterJava");

    @Override
    public void route(Event event, Downstream<Event> downstream) {
        downstream.forward(
                JAVA_EVENT_COUNTER_TYPE,
                "count",
                event)
        ;
        downstream.forward(
                new Address(
                        PYTHON_EVENT_COUNTER_TYPE,
                        "count"
                ),
                event
        );
    }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
    state = context.state('count').unpack(GlobalEventCount)
    if not state:
        state = GlobalEventCount()
        state.value = 1
    else:
        state.value += 1
    context.state('count').pack(state)

    egress_message = kafka_egress_record(topic="eventCountPython", value=state)
    context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO org.apache.flink.runtime.taskmanager.Task                     - feedback-union -> functions -> (Sink: demo-eventCountPython-egress, Sink: demo-eventCountJava-egress) (1/1) (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED. worker_1         | java.lang.IllegalArgumentException: Unknown provider for type FunctionType(demo, eventCounterPython) worker_1         |     at org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44) worker_1         |     at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63) worker_1         |     at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56) worker_1         |     at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73) worker_1         |     at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50) worker_1         |     at org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135) worker_1         |     at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130) worker_1         |     at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82) worker_1         |     at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) worker_1         |     at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) worker_1         |     at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) worker_1         |     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) worker_1         |     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) worker_1         |     at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174) worker_1         |     at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80) worker_1         |     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) worker_1         |     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) worker_1         |     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) worker_1         |     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) worker_1         |     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) worker_1         |     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) worker_1         |     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) worker_1         |     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) worker_1         |     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) worker_1         |     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

Reply via email to