Hi Marco, The problem is that you’re trying to call the function directly via HTTP. This is not how it's supposed to work. Instead, check out how to define your statefun module https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/ and how to deploy Stateful Functions https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview. The Stateful Functions application consists of Flink Cluster, ingresses to consume incoming data, egresses to produce outgoing data and the HTTP server that serves remote Stateful Functions. Also you can checkout the full example here https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736
Best, Tymur Yarosh 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com>, писав(-ла): > I'm trying to write very simple echo app with Stateful Function to prove it > as a technology for some of my use cases. > > I have not been able to accept different content types though. Here is an > example of my code for a simple > echo function: > > My Echo stateful function class. > > package statefun.implementation.functions; > > import statefun.api.annotations.Statefun; > import statefun.api.factories.StatefulFunctionSpecFactory; > import com.fasterxml.jackson.databind.JsonNode; > import org.apache.flink.statefun.sdk.java.Context; > import org.apache.flink.statefun.sdk.java.StatefulFunction; > import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; > import org.apache.flink.statefun.sdk.java.TypeName; > import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; > import org.apache.flink.statefun.sdk.java.message.Message; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.util.concurrent.CompletableFuture; > import java.util.function.Supplier; > > @Statefun > public class EchoFn implements StatefulFunction, StatefulFunctionSpecFactory { > > private final static Logger logger = LoggerFactory.getLogger(EchoFn.class); > > private final static TypeName fnType = TypeName.typeNameOf("std.fns", > "echo"); > > public final static StatefulFunctionSpec SPEC = > > StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build(); > > @Override > public CompletableFuture<Void> apply(Context context, Message message) > throws Throwable { > > if (message.is(Types.JAVA_STRING_TYPE)) { > final String request = message.as(Types.JAVA_STRING_TYPE); > > context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); > } else if (message.is(Types.JSON_TYPE)) { > final JsonNode json = message.as(Types.JSON_TYPE); > final String request = json.asText(); > > context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); > } > > return context.done(); > } > > @Override > public StatefulFunctionSpec createSpec(Supplier<? extends StatefulFunction> > supplier) { > return StatefulFunctionSpec.builder(EchoFn.fnType) > .withSupplier(supplier) > .build(); > } > > @Override > public String toString() { > return fnType.toString(); > } > } > My Types class, which kind of copies > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java > > package statefun.implementation.functions; > > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import org.apache.flink.statefun.sdk.java.TypeName; > import org.apache.flink.statefun.sdk.java.types.SimpleType; > import org.apache.flink.statefun.sdk.java.types.Type; > > public class Types { > > public static final String JAVA_LANG_NAMESPACE = "java.lang"; > private final static ObjectMapper objectMapper = new ObjectMapper(); > public static final SimpleType.Fn<String, byte[]> javaStringSerialize = > input -> input == null ? new byte[0] : input.getBytes(); > public static final SimpleType.Fn<byte[], String> javaStringDeserialize = > input -> input == null || input.length == 0 ? null : new String(input); > > public static final Type<String> JAVA_STRING_TYPE = > SimpleType.simpleImmutableTypeFrom( > TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"), > javaStringSerialize, > javaStringDeserialize > ); > > public static final Type<JsonNode> JSON_TYPE = > SimpleType.simpleImmutableTypeFrom( > TypeName.typeNameOf("application", JsonNode.class.getName()), > objectMapper::writeValueAsBytes, > bytes -> objectMapper.readValue(bytes, JsonNode.class)); > } > > My Undertow Handler, copied from > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java > > package statefun.implementation.undertow; > > import io.undertow.server.HttpHandler; > import io.undertow.server.HttpServerExchange; > import io.undertow.util.Headers; > import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; > import org.apache.flink.statefun.sdk.java.slice.Slice; > import org.apache.flink.statefun.sdk.java.slice.Slices; > > import java.util.Objects; > import java.util.concurrent.CompletableFuture; > > /** > * A simple Undertow {@link HttpHandler} that delegates requests from StateFun > runtime processes to > * a StateFun {@link RequestReplyHandler}. > */ > public final class UndertowHttpHandler implements HttpHandler { > private final RequestReplyHandler handler; > > public UndertowHttpHandler(RequestReplyHandler handler) { > this.handler = Objects.requireNonNull(handler); > } > > @Override > public void handleRequest(HttpServerExchange exchange) { > exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody); > } > > private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes) { > exchange.dispatch(); > CompletableFuture<Slice> future = > handler.handle(Slices.wrap(requestBytes)); > future.whenComplete((response, exception) -> onComplete(exchange, > response, exception)); > } > > private void onComplete(HttpServerExchange exchange, Slice responseBytes, > Throwable ex) { > if (ex != null) { > ex.printStackTrace(System.out); > exchange.getResponseHeaders().put(Headers.STATUS, 500); > exchange.endExchange(); > return; > } > exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, > "application/octet-stream"); > exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer()); > } > } > > My App Server copied from > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java > package statefun; > > import statefun.implementation.functions.EchoFn; > import statefun.implementation.undertow.UndertowHttpHandler; > import io.undertow.Undertow; > import org.apache.flink.statefun.sdk.java.StatefulFunctions; > import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; > > > public final class StatefulFunctionsAppServer { > > public static void main(String[] args) { > final StatefulFunctions functions = new StatefulFunctions(); > functions.withStatefulFunction(EchoFn.SPEC); > > final RequestReplyHandler requestReplyHandler = > functions.requestReplyHandler(); > final Undertow httpServer = > Undertow.builder() > .addHttpListener(8080, "0.0.0.0") > .setHandler(new UndertowHttpHandler(requestReplyHandler)) > .build(); > httpServer.start(); > } > } > > > However, when I run > > curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' > http://localhost:8080/std.fns/echo > > or > > curl -X POST -H "Content-Type: > application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello > World"}' http://localhost:8080/std.fns/echo > > I get the following error on the server. > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException: > While parsing a protocol message, the input ended unexpectedly in the middle > of a field. This could mean either that the input has been truncated or that > an embedded message misreported its own length. > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298) > at > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65) > at > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14) > at > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090) > at > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125) > at > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) > at > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666) > at > org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60) > at > statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49) > at > io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399) > at > io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481) > at > statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44) > at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387) > at > io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256) > at > io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136) > at > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162) > at > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100) > at > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57) > at > org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) > at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291) > at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286) > at > org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) > at > org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178) > at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612) > at org.xnio.nio.WorkerThread.run(WorkerThread.java:479) > > > > I've been stuck on this for a very long time. Can somebody please advise me > as to what I am doing wrong? The framework never even gets to the point where > it parses my payload. > >