Hi,
based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was
able to:
1) Deserialize JSON Messages from a Kafka Stream
2) Route the message to an embedded StatefulFunction
3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream
Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.
module.yaml:
--------------------------------
...
spec:
functions:
- function:
meta:
kind: http
type: demo/eventCounterPython
spec:
endpoint: http://python-worker:8000/statefun
states:
- name: count
maxNumBatchRequests: 500
...
---------------------------------------
EventIO.java
---------------------------------------------
final class EventIO {
static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);
....
EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
.withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
.build();
}
....
private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {
private static final long serialVersionUID = 1L;
@Override
public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
byte[] key = null;
byte[] value = null;
try {
String json = JsonFormat
.printer()
.includingDefaultValueFields()
.print(eventCount);
value = json.getBytes();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return new ProducerRecord<>("eventCountPython", key, value);
}
}
}
--------------------------------------------
EventModule.java:
--------------------------------------
public final class EventModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration,
Binder binder) {
EventIO ioModule = new EventIO("kafka:9092");
binder.bindIngress(ioModule.getIngressSpec());
binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());
binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());
binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
}
}
------------------------------------------
EventRouter.java
-------------------------------------------
final class EventRouter implements Router<Event> {
static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
@Override
public void route(Event event, Downstream<Event> downstream) {
downstream.forward(
JAVA_EVENT_COUNTER_TYPE,
"count",
event)
;
downstream.forward(
new Address(
PYTHON_EVENT_COUNTER_TYPE,
"count"
),
event
);
}
}
------------------------------------------
worker.py
-------------------------------------------
@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
state = context.state('count').unpack(GlobalEventCount)
if not state:
state = GlobalEventCount()
state.value = 1
else:
state.value += 1
context.state('count').pack(state)
egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
context.pack_and_send_egress("demo/eventCountPython", egress_message)
------------------------------------------
ERROR MESSAGE
------------------------------------------
worker_1 | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1 | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1 | at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1 | at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1 | at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1 | at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1 | at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1 | at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1 | at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1 | at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1 | at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1 | at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533
------------------------------------------
Best regards and thanks for taking the time to read all this,
Jan