Hi Jan,
Two followup questions:
1. Looking at the stack trace provided in your email, it does seem
like the function type is unavailable, and I'd like to follow up on
that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that,
then you can please try to execute into the container
and manually validate that the module.yaml is present
both on the "worker" image and the "master" image, and it defines the
remote function name correctly?
2. In your original email, the provided router does not route messages
of type Any, but it actually
forwards them as-in, the remote functions API requires that the
message being sent to the remote function
is of type Any. Can you try something like this:
final class EventRouter implements Router<com.google.protobuf.Message > {
static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
@Override
public void route(com.google.protobuf.Message event,
Downstream<com.google.protobuf.Message> downstream) {
downstream.forward(
JAVA_EVENT_COUNTER_TYPE,
"count",
event)
;
downstream.forward(
new Address(
PYTHON_EVENT_COUNTER_TYPE,
"count"
),
Any.pack(event)
);
}
}
In addition you would have to change the definition of your ingress
identifier to have a produced type of com.google.protobuf.Message
instead of an Event.
Good luck!
Igal
On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <jan.bru...@neuland-bfi.de
<mailto:jan.bru...@neuland-bfi.de>> wrote:
Hi Igal,
thanks for your reply. Initially I thought the same thing, but it
turns out I am able to call the remote function from an embedded
"wrapper" function using the exact same setup (Relevant Code
below). So that's one kind of solution to that Problem. But to me
it seems like it's a bit of a hack and not the idiomatic way to
solve this...
From my understanding of the address based communication within
Flink Stateful Functions, I feel like it should be possible to
call that function from the router directly. But I am probably
either using the Router wrong or misunderstand some of the ideas
behind address based communication...
EventRouter.java
------------------------------------------------------------------------------------
final class EventRouter implements Router<Event> {
@Override
public void route(Event event, Downstream<Event> downstream) {
downstream.forward(EventCounterWrapper.TYPE, "_", event);
}
}
--------------------------------------------------------------------------------------
EventCounterWrapper.java
---------------------------------------------------------------------------------------
public class EventCounterWrapper implements StatefulFunction {
static final FunctionType TYPE = new FunctionType("demo",
"eventCounterWrapper");
public static final FunctionType REMOTE_FUNCTION_TYPE = new
FunctionType("demo/external", "eventCounterPython");
@Override
public void invoke(Context context, Object input) {
if (input instanceof Event) {
Event event = (Event) input;
Any message = Any.pack(event);
context.send(REMOTE_FUNCTION_TYPE, "_", message);
}
if (input instanceof Any) {
final EventCount eventCount;
try {
eventCount = ((Any) input).unpack(EventCount.class);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Unexpected type", e);
}
context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
}
}
}
-----------------------------------------------------------------------------------
worker.py
----------------------------------------------------
@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
state = context.state('count').unpack(EventCount)
if not state:
state = EventCount()
state.count = 1
else:
state.count += 1
context.state('count').pack(state)
envelope = Any()
envelope.Pack(state)
context.reply(envelope)
----------------------------------------------------
module.yaml
---------------------------------------------------------
spec:
functions:
- function:
meta:
kind: http
type: demo/external/eventCounterPython
spec:
endpoint: http://python-worker:8000/statefun
states:
- count
---------------------------------------------------------
Best Regards
Jan
On 03.07.20 17:33, Igal Shilman wrote:
Hi Jan,
Judging by the exception message it seems like the function type
"demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was
accidentally excluded from the resulting artifact (Docker image
or a jar-with-dependencies)
Can you please verify that the module.yaml is present at runtime?
Kind regards,
Igal.
On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch
<jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:
Hi,
based on Gordons excellent advice on how to handle JSON
messages with
remote functions
(https://www.mail-archive.com/user@flink.apache.org/msg34385.html)
I was
able to:
1) Deserialize JSON Messages from a Kafka Stream
2) Route the message to an embedded StatefulFunction
3) Serialize the Resulting Protobuf Message to JSON and write
it back to
a Kafka Stream
Now, instead of 2), I would like to route the message to a
remote
function, handle it there and write it back to the Stream as
json via
the serializer defined in Java. From my understanding all
this should
work through address based communication within the Stateful
Functions
Application. Unfortunately I don't get it to work. See the
relevant code
and error message below. The rest of the project structure
basically
follows the walkthrough example from the documentation. Any
ideas or
input would be greatly appreciated.
module.yaml:
--------------------------------
...
spec:
functions:
- function:
meta:
kind: http
type: demo/eventCounterPython
spec:
endpoint: http://python-worker:8000/statefun
states:
- name: count
maxNumBatchRequests: 500
...
---------------------------------------
EventIO.java
---------------------------------------------
final class EventIO {
static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);
....
EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
return
KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
.withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
.build();
}
....
private static final class
GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {
private static final long serialVersionUID = 1L;
@Override
public ProducerRecord<byte[], byte[]>
serialize(GlobalEventCount
eventCount) {
byte[] key = null;
byte[] value = null;
try {
String json = JsonFormat
.printer()
.includingDefaultValueFields()
.print(eventCount);
value = json.getBytes();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return new ProducerRecord<>("eventCountPython", key,
value);
}
}
}
--------------------------------------------
EventModule.java:
--------------------------------------
public final class EventModule implements
StatefulFunctionModule {
@Override
public void configure(Map<String, String>
globalConfiguration,
Binder binder) {
EventIO ioModule = new EventIO("kafka:9092");
binder.bindIngress(ioModule.getIngressSpec());
binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());
binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());
binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE,
unused
-> new EventCountJavaStatefulFunction());
}
}
------------------------------------------
EventRouter.java
-------------------------------------------
final class EventRouter implements Router<Event> {
static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
@Override
public void route(Event event, Downstream<Event>
downstream) {
downstream.forward(
JAVA_EVENT_COUNTER_TYPE,
"count",
event)
;
downstream.forward(
new Address(
PYTHON_EVENT_COUNTER_TYPE,
"count"
),
event
);
}
}
------------------------------------------
worker.py
-------------------------------------------
@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
state = context.state('count').unpack(GlobalEventCount)
if not state:
state = GlobalEventCount()
state.value = 1
else:
state.value += 1
context.state('count').pack(state)
egress_message =
kafka_egress_record(topic="eventCountPython",
value=state)
context.pack_and_send_egress("demo/eventCountPython",
egress_message)
------------------------------------------
ERROR MESSAGE
------------------------------------------
worker_1 | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task -
feedback-union -> functions -> (Sink:
demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to
FAILED.
worker_1 | java.lang.IllegalArgumentException:
Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1 | at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1 | at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1 | at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1 | at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1 | at
org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1 | at
org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1 | at
org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1 | at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1 | at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533
------------------------------------------
Best regards and thanks for taking the time to read all this,
Jan
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501