IK If what you're saying is true, then why do most of the examples in the flink-statefun-playground example use HTTP as an alternative entry point?
Here is the greeter example: https://github.com/apache/flink-statefun-playground/tree/main/java/greeter <https://github.com/apache/flink-statefun-playground/tree/main/java/greeter> > On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com> wrote: > > Hi Marco, > The problem is that you’re trying to call the function directly via HTTP. > This is not how it's supposed to work. Instead, check out how to define your > statefun module > https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/ > > <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/> > and how to deploy Stateful Functions > https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview > > <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview>. > The Stateful Functions application consists of Flink Cluster, ingresses to > consume incoming data, egresses to produce outgoing data and the HTTP server > that serves remote Stateful Functions. Also you can checkout the full example > here > https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736 > > <https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736> > > Best, > Tymur Yarosh > 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com>, > писав(-ла): >> I'm trying to write very simple echo app with Stateful Function to prove it >> as a technology for some of my use cases. >> >> I have not been able to accept different content types though. Here is an >> example of my code for a simple >> echo function: >> >> My Echo stateful function class. >> >> package statefun.implementation.functions; >> >> import statefun.api.annotations.Statefun; >> import statefun.api.factories.StatefulFunctionSpecFactory; >> import com.fasterxml.jackson.databind.JsonNode; >> import org.apache.flink.statefun.sdk.java.Context; >> import org.apache.flink.statefun.sdk.java.StatefulFunction; >> import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; >> import org.apache.flink.statefun.sdk.java.TypeName; >> import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; >> import org.apache.flink.statefun.sdk.java.message.Message; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import java.util.concurrent.CompletableFuture; >> import java.util.function.Supplier; >> >> @Statefun >> public class EchoFn implements StatefulFunction, StatefulFunctionSpecFactory >> { >> >> private final static Logger logger = >> LoggerFactory.getLogger(EchoFn.class); >> >> private final static TypeName fnType = TypeName.typeNameOf("std.fns", >> "echo"); >> >> public final static StatefulFunctionSpec SPEC = >> >> StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build(); >> >> @Override >> public CompletableFuture<Void> apply(Context context, Message message) >> throws Throwable { >> >> if (message.is(Types.JAVA_STRING_TYPE)) { >> final String request = message.as(Types.JAVA_STRING_TYPE); >> >> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); >> } else if (message.is(Types.JSON_TYPE)) { >> final JsonNode json = message.as(Types.JSON_TYPE); >> final String request = json.asText(); >> >> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); >> } >> >> return context.done(); >> } >> >> @Override >> public StatefulFunctionSpec createSpec(Supplier<? extends >> StatefulFunction> supplier) { >> return StatefulFunctionSpec.builder(EchoFn.fnType) >> .withSupplier(supplier) >> .build(); >> } >> >> @Override >> public String toString() { >> return fnType.toString(); >> } >> } >> My Types class, which kind of copies >> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java >> >> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java> >> >> package statefun.implementation.functions; >> >> import com.fasterxml.jackson.databind.JsonNode; >> import com.fasterxml.jackson.databind.ObjectMapper; >> import org.apache.flink.statefun.sdk.java.TypeName; >> import org.apache.flink.statefun.sdk.java.types.SimpleType; >> import org.apache.flink.statefun.sdk.java.types.Type; >> >> public class Types { >> >> public static final String JAVA_LANG_NAMESPACE = "java.lang"; >> private final static ObjectMapper objectMapper = new ObjectMapper(); >> public static final SimpleType.Fn<String, byte[]> javaStringSerialize = >> input -> input == null ? new byte[0] : input.getBytes(); >> public static final SimpleType.Fn<byte[], String> javaStringDeserialize = >> input -> input == null || input.length == 0 ? null : new String(input); >> >> public static final Type<String> JAVA_STRING_TYPE = >> SimpleType.simpleImmutableTypeFrom( >> TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"), >> javaStringSerialize, >> javaStringDeserialize >> ); >> >> public static final Type<JsonNode> JSON_TYPE = >> SimpleType.simpleImmutableTypeFrom( >> TypeName.typeNameOf("application", JsonNode.class.getName()), >> objectMapper::writeValueAsBytes, >> bytes -> objectMapper.readValue(bytes, JsonNode.class)); >> } >> >> My Undertow Handler, copied from >> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java >> >> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java> >> >> package statefun.implementation.undertow; >> >> import io.undertow.server.HttpHandler; >> import io.undertow.server.HttpServerExchange; >> import io.undertow.util.Headers; >> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; >> import org.apache.flink.statefun.sdk.java.slice.Slice; >> import org.apache.flink.statefun.sdk.java.slice.Slices; >> >> import java.util.Objects; >> import java.util.concurrent.CompletableFuture; >> >> /** >> * A simple Undertow {@link HttpHandler} that delegates requests from >> StateFun runtime processes to >> * a StateFun {@link RequestReplyHandler}. >> */ >> public final class UndertowHttpHandler implements HttpHandler { >> private final RequestReplyHandler handler; >> >> public UndertowHttpHandler(RequestReplyHandler handler) { >> this.handler = Objects.requireNonNull(handler); >> } >> >> @Override >> public void handleRequest(HttpServerExchange exchange) { >> exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody); >> } >> >> private void onRequestBody(HttpServerExchange exchange, byte[] >> requestBytes) { >> exchange.dispatch(); >> CompletableFuture<Slice> future = >> handler.handle(Slices.wrap(requestBytes)); >> future.whenComplete((response, exception) -> onComplete(exchange, >> response, exception)); >> } >> >> private void onComplete(HttpServerExchange exchange, Slice responseBytes, >> Throwable ex) { >> if (ex != null) { >> ex.printStackTrace(System.out); >> exchange.getResponseHeaders().put(Headers.STATUS, 500); >> exchange.endExchange(); >> return; >> } >> exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, >> "application/octet-stream"); >> exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer()); >> } >> } >> >> My App Server copied from >> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java >> >> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java> >> package statefun; >> >> import statefun.implementation.functions.EchoFn; >> import statefun.implementation.undertow.UndertowHttpHandler; >> import io.undertow.Undertow; >> import org.apache.flink.statefun.sdk.java.StatefulFunctions; >> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; >> >> >> public final class StatefulFunctionsAppServer { >> >> public static void main(String[] args) { >> final StatefulFunctions functions = new StatefulFunctions(); >> functions.withStatefulFunction(EchoFn.SPEC); >> >> final RequestReplyHandler requestReplyHandler = >> functions.requestReplyHandler(); >> final Undertow httpServer = >> Undertow.builder() >> .addHttpListener(8080, "0.0.0.0") >> .setHandler(new UndertowHttpHandler(requestReplyHandler)) >> .build(); >> httpServer.start(); >> } >> } >> >> >> However, when I run >> >> curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' >> http://localhost:8080/std.fns/echo <http://localhost:8080/std.fns/echo> >> >> or >> >> curl -X POST -H "Content-Type: >> application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello >> World"}' http://localhost:8080/std.fns/echo >> <http://localhost:8080/std.fns/echo> >> >> I get the following error on the server. >> >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException: >> While parsing a protocol message, the input ended unexpectedly in the >> middle of a field. This could mean either that the input has been truncated >> or that an embedded message misreported its own length. >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298) >> at >> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65) >> at >> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14) >> at >> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090) >> at >> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125) >> at >> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) >> at >> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666) >> at >> org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60) >> at >> statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49) >> at >> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399) >> at >> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481) >> at >> statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44) >> at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387) >> at >> io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256) >> at >> io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136) >> at >> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162) >> at >> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100) >> at >> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57) >> at >> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) >> at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291) >> at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286) >> at >> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) >> at >> org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178) >> at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612) >> at org.xnio.nio.WorkerThread.run(WorkerThread.java:479) >> >> >> >> I've been stuck on this for a very long time. Can somebody please advise me >> as to what I am doing wrong? The framework never even gets to the point >> where it parses my payload. >> >>