Hi Igal,

thanks for your reply. Initially I thought the same thing, but it turns out I am able to call the remote function from an embedded "wrapper" function using the exact same setup (Relevant Code below). So that's one kind of solution to that Problem. But to me it seems like it's a bit of a hack and not the idiomatic way to solve this...

From my understanding of the address based communication within Flink Stateful Functions, I feel like it should be possible to call that function from the router directly. But I am probably either using the Router wrong or misunderstand some of the ideas behind address based communication...


EventRouter.java

------------------------------------------------------------------------------------

final class EventRouter implements Router<Event> {

  @Override
  public void route(Event event, Downstream<Event> downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}

--------------------------------------------------------------------------------------


EventCounterWrapper.java

---------------------------------------------------------------------------------------

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo", "eventCounterWrapper");     public static final FunctionType REMOTE_FUNCTION_TYPE = new FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof Event) {
            Event event = (Event) input;
            Any message = Any.pack(event);
            context.send(REMOTE_FUNCTION_TYPE, "_", message);
        }

        if (input instanceof Any) {
            final EventCount eventCount;
            try {
                eventCount = ((Any) input).unpack(EventCount.class);
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Unexpected type", e);
            }
            context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
        }
    }
}

-----------------------------------------------------------------------------------


worker.py
----------------------------------------------------
@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(EventCount)
     if not state:
         state = EventCount()
         state.count = 1
     else:
         state.count += 1
     context.state('count').pack(state)

    envelope = Any()
    envelope.Pack(state)
    context.reply(envelope)
----------------------------------------------------


module.yaml

---------------------------------------------------------

spec:
    functions:
      - function:
          meta:
            kind: http
            type: demo/external/eventCounterPython
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - count

---------------------------------------------------------


Best Regards

Jan


On 03.07.20 17:33, Igal Shilman wrote:
Hi Jan,
Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions. This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies)
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

    Hi,

    based on Gordons excellent advice on how to handle JSON messages with
    remote functions
    (https://www.mail-archive.com/user@flink.apache.org/msg34385.html)
    I was
    able to:

    1) Deserialize JSON Messages from a Kafka Stream

    2) Route the message to an embedded StatefulFunction

    3) Serialize the Resulting Protobuf Message to JSON and write it
    back to
    a Kafka Stream

    Now, instead of 2), I would like to route the message to a remote
    function, handle it there and write it back to the Stream as json via
    the serializer defined in Java. From my understanding all this should
    work through address based communication within the Stateful
    Functions
    Application. Unfortunately I don't get it to work. See the
    relevant code
    and error message below. The rest of the project structure basically
    follows the walkthrough example from the documentation. Any ideas or
    input would be greatly appreciated.


    module.yaml:

    --------------------------------

    ...

    spec:
         functions:
           - function:
               meta:
                 kind: http
                 type: demo/eventCounterPython
               spec:
                 endpoint: http://python-worker:8000/statefun
                 states:
                   - name: count
                 maxNumBatchRequests: 500

    ...

    ---------------------------------------


    EventIO.java

    ---------------------------------------------

    final class EventIO {

       static final EgressIdentifier<GlobalEventCount>
    EVENT_COUNT_PYTHON_EGRESS_ID =
           new EgressIdentifier<>("demo", "eventCountPython",
    GlobalEventCount.class);

         ....

       EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
         return
    KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
             .withKafkaAddress(kafkaAddress)
    .withSerializer(GlobalEventCountPythonKafkaSerializer.class)
             .build();
       }

       ....

       private static final class GlobalEventCountPythonKafkaSerializer
    implements KafkaEgressSerializer<GlobalEventCount> {

         private static final long serialVersionUID = 1L;

         @Override
         public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
    eventCount) {
           byte[] key = null;
           byte[] value = null;

           try {
             String json = JsonFormat
                     .printer()
                     .includingDefaultValueFields()
                     .print(eventCount);
             value = json.getBytes();
           } catch (InvalidProtocolBufferException e) {
             e.printStackTrace();
           }

           return new ProducerRecord<>("eventCountPython", key, value);
         }
       }

    }

    --------------------------------------------


    EventModule.java:

    --------------------------------------

    public final class EventModule implements StatefulFunctionModule {

         @Override
         public void configure(Map<String, String> globalConfiguration,
    Binder binder) {

             EventIO ioModule = new EventIO("kafka:9092");

             binder.bindIngress(ioModule.getIngressSpec());

             binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
    EventRouter());

    binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
    binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

    binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE,
    unused
    -> new EventCountJavaStatefulFunction());
         }
    }

    ------------------------------------------


    EventRouter.java

    -------------------------------------------

    final class EventRouter implements Router<Event> {

         static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
    FunctionType("demo", "eventCounterPython");
         static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
    FunctionType("demo", "eventCounterJava");

         @Override
         public void route(Event event, Downstream<Event> downstream) {
             downstream.forward(
                     JAVA_EVENT_COUNTER_TYPE,
                     "count",
                     event)
             ;
             downstream.forward(
                     new Address(
                             PYTHON_EVENT_COUNTER_TYPE,
                             "count"
                     ),
                     event
             );
         }
    }

    ------------------------------------------


    worker.py

    -------------------------------------------

    @functions.bind("demo/eventCounterPython")
    def handle_event(context, _):
         state = context.state('count').unpack(GlobalEventCount)
         if not state:
             state = GlobalEventCount()
             state.value = 1
         else:
             state.value += 1
         context.state('count').pack(state)

         egress_message = kafka_egress_record(topic="eventCountPython",
    value=state)
         context.pack_and_send_egress("demo/eventCountPython",
    egress_message)

    ------------------------------------------


    ERROR MESSAGE

    ------------------------------------------

    worker_1         | 2020-07-02 14:38:35,436 INFO
    org.apache.flink.runtime.taskmanager.Task                     -
    feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
    Sink: demo-eventCountJava-egress) (1/1)
    (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
    worker_1         | java.lang.IllegalArgumentException: Unknown
    provider
    for type FunctionType(demo, eventCounterPython)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
    worker_1         |     at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    worker_1         |     at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
    worker_1         |     at
    
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
    worker_1         |     at
    org.apache.flink.streaming.runtime.io
    
<http://runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    worker_1         |     at
    org.apache.flink.streaming.runtime.io
    
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    worker_1         |     at
    org.apache.flink.streaming.runtime.io
    
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
    worker_1         |     at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
    worker_1         |     at
    org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    worker_1         |     at
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

    ------------------------------------------


    Best regards and thanks for taking the time to read all this,

    Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to