You're right. I didn't notice that the ports were different. That was very subtle.
Thank you for pointing this out to me. I was stuck on it for quite a while. > On Apr 16, 2022, at 6:17 PM, Marco Villalobos <mvillalo...@kineteque.com> > wrote: > > I'm sorry, I accidentally hit send before I was finished. > > Yes, I understand that Flink Stateful Functions run as part of a cluster in > which state and throughput are managed by Flink. However, in > StatefulFunctions, remote functions are still > implemented as remote HTTP micro-services often deployed as part of a Lambda > deployment. > > However, the examples that are linked to Flink stateful functions, such as > those in the Flink Stateful Function playround show HTTP as an entry point > into the application. > > I can't get that to work, and I am not sure if its even correct. > >> On Apr 16, 2022, at 6:12 PM, Marco Villalobos <mvillalo...@kineteque.com >> <mailto:mvillalo...@kineteque.com>> wrote: >> >> IK >> >> If what you're saying is true, then why do most of the examples in the >> flink-statefun-playground example use HTTP as an alternative entry point? >> >> Here is the greeter example: >> >> https://github.com/apache/flink-statefun-playground/tree/main/java/greeter >> <https://github.com/apache/flink-statefun-playground/tree/main/java/greeter> >> >> >> >>> On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com >>> <mailto:ti.yar...@gmail.com>> wrote: >>> >>> Hi Marco, >>> The problem is that you’re trying to call the function directly via HTTP. >>> This is not how it's supposed to work. Instead, check out how to define >>> your statefun module >>> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/ >>> >>> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/> >>> and how to deploy Stateful Functions >>> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview >>> >>> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview>. >>> The Stateful Functions application consists of Flink Cluster, ingresses to >>> consume incoming data, egresses to produce outgoing data and the HTTP >>> server that serves remote Stateful Functions. Also you can checkout the >>> full example here >>> https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736 >>> >>> <https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736> >>> >>> Best, >>> Tymur Yarosh >>> 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com >>> <mailto:mvillalo...@kineteque.com>>, писав(-ла): >>>> I'm trying to write very simple echo app with Stateful Function to prove >>>> it as a technology for some of my use cases. >>>> >>>> I have not been able to accept different content types though. Here is an >>>> example of my code for a simple >>>> echo function: >>>> >>>> My Echo stateful function class. >>>> >>>> package statefun.implementation.functions; >>>> >>>> import statefun.api.annotations.Statefun; >>>> import statefun.api.factories.StatefulFunctionSpecFactory; >>>> import com.fasterxml.jackson.databind.JsonNode; >>>> import org.apache.flink.statefun.sdk.java.Context; >>>> import org.apache.flink.statefun.sdk.java.StatefulFunction; >>>> import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; >>>> import org.apache.flink.statefun.sdk.java.TypeName; >>>> import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; >>>> import org.apache.flink.statefun.sdk.java.message.Message; >>>> import org.slf4j.Logger; >>>> import org.slf4j.LoggerFactory; >>>> >>>> import java.util.concurrent.CompletableFuture; >>>> import java.util.function.Supplier; >>>> >>>> @Statefun >>>> public class EchoFn implements StatefulFunction, >>>> StatefulFunctionSpecFactory { >>>> >>>> private final static Logger logger = >>>> LoggerFactory.getLogger(EchoFn.class); >>>> >>>> private final static TypeName fnType = TypeName.typeNameOf("std.fns", >>>> "echo"); >>>> >>>> public final static StatefulFunctionSpec SPEC = >>>> >>>> StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build(); >>>> >>>> @Override >>>> public CompletableFuture<Void> apply(Context context, Message message) >>>> throws Throwable { >>>> >>>> if (message.is(Types.JAVA_STRING_TYPE)) { >>>> final String request = message.as(Types.JAVA_STRING_TYPE); >>>> >>>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); >>>> } else if (message.is(Types.JSON_TYPE)) { >>>> final JsonNode json = message.as(Types.JSON_TYPE); >>>> final String request = json.asText(); >>>> >>>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); >>>> } >>>> >>>> return context.done(); >>>> } >>>> >>>> @Override >>>> public StatefulFunctionSpec createSpec(Supplier<? extends >>>> StatefulFunction> supplier) { >>>> return StatefulFunctionSpec.builder(EchoFn.fnType) >>>> .withSupplier(supplier) >>>> .build(); >>>> } >>>> >>>> @Override >>>> public String toString() { >>>> return fnType.toString(); >>>> } >>>> } >>>> My Types class, which kind of copies >>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java >>>> >>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java> >>>> >>>> package statefun.implementation.functions; >>>> >>>> import com.fasterxml.jackson.databind.JsonNode; >>>> import com.fasterxml.jackson.databind.ObjectMapper; >>>> import org.apache.flink.statefun.sdk.java.TypeName; >>>> import org.apache.flink.statefun.sdk.java.types.SimpleType; >>>> import org.apache.flink.statefun.sdk.java.types.Type; >>>> >>>> public class Types { >>>> >>>> public static final String JAVA_LANG_NAMESPACE = "java.lang"; >>>> private final static ObjectMapper objectMapper = new ObjectMapper(); >>>> public static final SimpleType.Fn<String, byte[]> javaStringSerialize = >>>> input -> input == null ? new byte[0] : input.getBytes(); >>>> public static final SimpleType.Fn<byte[], String> javaStringDeserialize >>>> = input -> input == null || input.length == 0 ? null : new String(input); >>>> >>>> public static final Type<String> JAVA_STRING_TYPE = >>>> SimpleType.simpleImmutableTypeFrom( >>>> TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"), >>>> javaStringSerialize, >>>> javaStringDeserialize >>>> ); >>>> >>>> public static final Type<JsonNode> JSON_TYPE = >>>> SimpleType.simpleImmutableTypeFrom( >>>> TypeName.typeNameOf("application", >>>> JsonNode.class.getName()), >>>> objectMapper::writeValueAsBytes, >>>> bytes -> objectMapper.readValue(bytes, JsonNode.class)); >>>> } >>>> >>>> My Undertow Handler, copied from >>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java >>>> >>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java> >>>> >>>> package statefun.implementation.undertow; >>>> >>>> import io.undertow.server.HttpHandler; >>>> import io.undertow.server.HttpServerExchange; >>>> import io.undertow.util.Headers; >>>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; >>>> import org.apache.flink.statefun.sdk.java.slice.Slice; >>>> import org.apache.flink.statefun.sdk.java.slice.Slices; >>>> >>>> import java.util.Objects; >>>> import java.util.concurrent.CompletableFuture; >>>> >>>> /** >>>> * A simple Undertow {@link HttpHandler} that delegates requests from >>>> StateFun runtime processes to >>>> * a StateFun {@link RequestReplyHandler}. >>>> */ >>>> public final class UndertowHttpHandler implements HttpHandler { >>>> private final RequestReplyHandler handler; >>>> >>>> public UndertowHttpHandler(RequestReplyHandler handler) { >>>> this.handler = Objects.requireNonNull(handler); >>>> } >>>> >>>> @Override >>>> public void handleRequest(HttpServerExchange exchange) { >>>> exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody); >>>> } >>>> >>>> private void onRequestBody(HttpServerExchange exchange, byte[] >>>> requestBytes) { >>>> exchange.dispatch(); >>>> CompletableFuture<Slice> future = >>>> handler.handle(Slices.wrap(requestBytes)); >>>> future.whenComplete((response, exception) -> onComplete(exchange, >>>> response, exception)); >>>> } >>>> >>>> private void onComplete(HttpServerExchange exchange, Slice >>>> responseBytes, Throwable ex) { >>>> if (ex != null) { >>>> ex.printStackTrace(System.out); >>>> exchange.getResponseHeaders().put(Headers.STATUS, 500); >>>> exchange.endExchange(); >>>> return; >>>> } >>>> exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, >>>> "application/octet-stream"); >>>> >>>> exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer()); >>>> } >>>> } >>>> >>>> My App Server copied from >>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java >>>> >>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java> >>>> package statefun; >>>> >>>> import statefun.implementation.functions.EchoFn; >>>> import statefun.implementation.undertow.UndertowHttpHandler; >>>> import io.undertow.Undertow; >>>> import org.apache.flink.statefun.sdk.java.StatefulFunctions; >>>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; >>>> >>>> >>>> public final class StatefulFunctionsAppServer { >>>> >>>> public static void main(String[] args) { >>>> final StatefulFunctions functions = new StatefulFunctions(); >>>> functions.withStatefulFunction(EchoFn.SPEC); >>>> >>>> final RequestReplyHandler requestReplyHandler = >>>> functions.requestReplyHandler(); >>>> final Undertow httpServer = >>>> Undertow.builder() >>>> .addHttpListener(8080, "0.0.0.0") >>>> .setHandler(new UndertowHttpHandler(requestReplyHandler)) >>>> .build(); >>>> httpServer.start(); >>>> } >>>> } >>>> >>>> >>>> However, when I run >>>> >>>> curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' >>>> http://localhost:8080/std.fns/echo <http://localhost:8080/std.fns/echo> >>>> >>>> or >>>> >>>> curl -X POST -H "Content-Type: >>>> application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello >>>> World"}' http://localhost:8080/std.fns/echo >>>> <http://localhost:8080/std.fns/echo> >>>> >>>> I get the following error on the server. >>>> >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException: >>>> While parsing a protocol message, the input ended unexpectedly in the >>>> middle of a field. This could mean either that the input has been >>>> truncated or that an embedded message misreported its own length. >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298) >>>> at >>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65) >>>> at >>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14) >>>> at >>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090) >>>> at >>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125) >>>> at >>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) >>>> at >>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666) >>>> at >>>> org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60) >>>> at >>>> statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49) >>>> at >>>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399) >>>> at >>>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481) >>>> at >>>> statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44) >>>> at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387) >>>> at >>>> io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256) >>>> at >>>> io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136) >>>> at >>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162) >>>> at >>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100) >>>> at >>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57) >>>> at >>>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) >>>> at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291) >>>> at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286) >>>> at >>>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) >>>> at >>>> org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178) >>>> at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612) >>>> at org.xnio.nio.WorkerThread.run(WorkerThread.java:479) >>>> >>>> >>>> >>>> I've been stuck on this for a very long time. Can somebody please advise >>>> me as to what I am doing wrong? The framework never even gets to the point >>>> where it parses my payload. >>>> >>>> >> >