I'm trying to write very simple echo app with Stateful Function to prove it as a technology for some of my use cases.
I have not been able to accept different content types though. Here is an example of my code for a simple echo function: My Echo stateful function class. package statefun.implementation.functions; import statefun.api.annotations.Statefun; import statefun.api.factories.StatefulFunctionSpecFactory; import com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.statefun.sdk.java.Context; import org.apache.flink.statefun.sdk.java.StatefulFunction; import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; import org.apache.flink.statefun.sdk.java.TypeName; import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; import org.apache.flink.statefun.sdk.java.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @Statefun public class EchoFn implements StatefulFunction, StatefulFunctionSpecFactory { private final static Logger logger = LoggerFactory.getLogger(EchoFn.class); private final static TypeName fnType = TypeName.typeNameOf("std.fns", "echo"); public final static StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build(); @Override public CompletableFuture<Void> apply(Context context, Message message) throws Throwable { if (message.is(Types.JAVA_STRING_TYPE)) { final String request = message.as(Types.JAVA_STRING_TYPE); context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); } else if (message.is(Types.JSON_TYPE)) { final JsonNode json = message.as(Types.JSON_TYPE); final String request = json.asText(); context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); } return context.done(); } @Override public StatefulFunctionSpec createSpec(Supplier<? extends StatefulFunction> supplier) { return StatefulFunctionSpec.builder(EchoFn.fnType) .withSupplier(supplier) .build(); } @Override public String toString() { return fnType.toString(); } } My Types class, which kind of copies https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java package statefun.implementation.functions; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.statefun.sdk.java.TypeName; import org.apache.flink.statefun.sdk.java.types.SimpleType; import org.apache.flink.statefun.sdk.java.types.Type; public class Types { public static final String JAVA_LANG_NAMESPACE = "java.lang"; private final static ObjectMapper objectMapper = new ObjectMapper(); public static final SimpleType.Fn<String, byte[]> javaStringSerialize = input -> input == null ? new byte[0] : input.getBytes(); public static final SimpleType.Fn<byte[], String> javaStringDeserialize = input -> input == null || input.length == 0 ? null : new String(input); public static final Type<String> JAVA_STRING_TYPE = SimpleType.simpleImmutableTypeFrom( TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"), javaStringSerialize, javaStringDeserialize ); public static final Type<JsonNode> JSON_TYPE = SimpleType.simpleImmutableTypeFrom( TypeName.typeNameOf("application", JsonNode.class.getName()), objectMapper::writeValueAsBytes, bytes -> objectMapper.readValue(bytes, JsonNode.class)); } My Undertow Handler, copied from https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java package statefun.implementation.undertow; import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; import io.undertow.util.Headers; import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; import org.apache.flink.statefun.sdk.java.slice.Slice; import org.apache.flink.statefun.sdk.java.slice.Slices; import java.util.Objects; import java.util.concurrent.CompletableFuture; /** * A simple Undertow {@link HttpHandler} that delegates requests from StateFun runtime processes to * a StateFun {@link RequestReplyHandler}. */ public final class UndertowHttpHandler implements HttpHandler { private final RequestReplyHandler handler; public UndertowHttpHandler(RequestReplyHandler handler) { this.handler = Objects.requireNonNull(handler); } @Override public void handleRequest(HttpServerExchange exchange) { exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody); } private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes) { exchange.dispatch(); CompletableFuture<Slice> future = handler.handle(Slices.wrap(requestBytes)); future.whenComplete((response, exception) -> onComplete(exchange, response, exception)); } private void onComplete(HttpServerExchange exchange, Slice responseBytes, Throwable ex) { if (ex != null) { ex.printStackTrace(System.out); exchange.getResponseHeaders().put(Headers.STATUS, 500); exchange.endExchange(); return; } exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream"); exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer()); } } My App Server copied from https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java package statefun; import statefun.implementation.functions.EchoFn; import statefun.implementation.undertow.UndertowHttpHandler; import io.undertow.Undertow; import org.apache.flink.statefun.sdk.java.StatefulFunctions; import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; public final class StatefulFunctionsAppServer { public static void main(String[] args) { final StatefulFunctions functions = new StatefulFunctions(); functions.withStatefulFunction(EchoFn.SPEC); final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler(); final Undertow httpServer = Undertow.builder() .addHttpListener(8080, "0.0.0.0") .setHandler(new UndertowHttpHandler(requestReplyHandler)) .build(); httpServer.start(); } } However, when I run curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' http://localhost:8080/std.fns/echo or curl -X POST -H "Content-Type: application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello World"}' http://localhost:8080/std.fns/echo I get the following error on the server. org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length. at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298) at org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65) at org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14) at org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090) at org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125) at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) at org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666) at org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60) at statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49) at io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399) at io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481) at statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44) at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387) at io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256) at io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136) at io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162) at io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100) at io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57) at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291) at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286) at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) at org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178) at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612) at org.xnio.nio.WorkerThread.run(WorkerThread.java:479) I've been stuck on this for a very long time. Can somebody please advise me as to what I am doing wrong? The framework never even gets to the point where it parses my payload.