You're right. I didn't notice that the ports were different. That was very 
subtle.

Thank you for pointing this out to me. I was stuck on it for quite a while.


> On Apr 16, 2022, at 6:17 PM, Marco Villalobos <mvillalo...@kineteque.com> 
> wrote:
> 
> I'm sorry, I accidentally hit send before I was finished.
> 
> Yes, I understand that Flink Stateful Functions run as part of a cluster in 
> which state and throughput are managed by Flink. However, in 
> StatefulFunctions, remote functions are still
> implemented as remote HTTP micro-services often deployed as part of a Lambda 
> deployment.
> 
> However, the examples that are linked to Flink stateful functions, such as 
> those in the Flink Stateful Function playround show HTTP as an entry point 
> into the application.
> 
> I can't get that to work, and I am not sure if its even correct.
> 
>> On Apr 16, 2022, at 6:12 PM, Marco Villalobos <mvillalo...@kineteque.com 
>> <mailto:mvillalo...@kineteque.com>> wrote:
>> 
>> IK 
>> 
>> If what you're saying is true, then why do most of the examples in the 
>> flink-statefun-playground example use HTTP as an alternative entry point?
>> 
>> Here is the greeter example:
>> 
>> https://github.com/apache/flink-statefun-playground/tree/main/java/greeter 
>> <https://github.com/apache/flink-statefun-playground/tree/main/java/greeter>
>> 
>> 
>> 
>>> On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com 
>>> <mailto:ti.yar...@gmail.com>> wrote:
>>> 
>>> Hi Marco, 
>>> The problem is that you’re trying to call the function directly via HTTP. 
>>> This is not how it's supposed to work. Instead, check out how to define 
>>> your statefun module 
>>> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
>>>  
>>> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/>
>>>  and how to deploy Stateful Functions 
>>> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview
>>>  
>>> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview>.
>>>  The Stateful Functions application consists of Flink Cluster, ingresses to 
>>> consume incoming data, egresses to produce outgoing data and the HTTP 
>>> server that serves remote Stateful Functions. Also you can checkout the 
>>> full example here 
>>> https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736
>>>  
>>> <https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736>
>>> 
>>> Best,
>>> Tymur Yarosh
>>> 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com 
>>> <mailto:mvillalo...@kineteque.com>>, писав(-ла):
>>>> I'm trying to write very simple echo app with Stateful Function to prove 
>>>> it as a technology for some of my use cases.
>>>> 
>>>> I have not been able to accept different content types though.  Here is an 
>>>> example of my code for a simple
>>>> echo function:
>>>> 
>>>> My Echo stateful function class.
>>>> 
>>>> package statefun.implementation.functions;
>>>> 
>>>> import statefun.api.annotations.Statefun;
>>>> import statefun.api.factories.StatefulFunctionSpecFactory;
>>>> import com.fasterxml.jackson.databind.JsonNode;
>>>> import org.apache.flink.statefun.sdk.java.Context;
>>>> import org.apache.flink.statefun.sdk.java.StatefulFunction;
>>>> import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
>>>> import org.apache.flink.statefun.sdk.java.TypeName;
>>>> import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
>>>> import org.apache.flink.statefun.sdk.java.message.Message;
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>> 
>>>> import java.util.concurrent.CompletableFuture;
>>>> import java.util.function.Supplier;
>>>> 
>>>> @Statefun
>>>> public class EchoFn implements StatefulFunction, 
>>>> StatefulFunctionSpecFactory {
>>>> 
>>>>    private final static Logger logger = 
>>>> LoggerFactory.getLogger(EchoFn.class);
>>>> 
>>>>    private final static TypeName fnType = TypeName.typeNameOf("std.fns", 
>>>> "echo");
>>>> 
>>>>    public final static StatefulFunctionSpec SPEC =
>>>>          
>>>> StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build();
>>>> 
>>>>    @Override
>>>>    public CompletableFuture<Void> apply(Context context, Message message) 
>>>> throws Throwable {
>>>> 
>>>>       if (message.is(Types.JAVA_STRING_TYPE)) {
>>>>          final String request = message.as(Types.JAVA_STRING_TYPE);
>>>>          
>>>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
>>>>       } else if (message.is(Types.JSON_TYPE)) {
>>>>          final JsonNode json = message.as(Types.JSON_TYPE);
>>>>          final String request = json.asText();
>>>>          
>>>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
>>>>       }
>>>> 
>>>>       return context.done();
>>>>    }
>>>> 
>>>>    @Override
>>>>    public StatefulFunctionSpec createSpec(Supplier<? extends 
>>>> StatefulFunction> supplier) {
>>>>       return StatefulFunctionSpec.builder(EchoFn.fnType)
>>>>             .withSupplier(supplier)
>>>>             .build();
>>>>    }
>>>> 
>>>>    @Override
>>>>    public String toString() {
>>>>       return fnType.toString();
>>>>    }
>>>> }
>>>> My Types class, which kind of copies 
>>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
>>>>  
>>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java>
>>>> 
>>>> package statefun.implementation.functions;
>>>> 
>>>> import com.fasterxml.jackson.databind.JsonNode;
>>>> import com.fasterxml.jackson.databind.ObjectMapper;
>>>> import org.apache.flink.statefun.sdk.java.TypeName;
>>>> import org.apache.flink.statefun.sdk.java.types.SimpleType;
>>>> import org.apache.flink.statefun.sdk.java.types.Type;
>>>> 
>>>> public class Types {
>>>> 
>>>>    public static final String JAVA_LANG_NAMESPACE = "java.lang";
>>>>    private final static ObjectMapper objectMapper = new ObjectMapper();
>>>>    public static final SimpleType.Fn<String, byte[]> javaStringSerialize = 
>>>> input -> input == null ? new byte[0] : input.getBytes();
>>>>    public static final SimpleType.Fn<byte[], String> javaStringDeserialize 
>>>> = input -> input == null || input.length == 0 ? null : new String(input);
>>>>    
>>>>    public static final Type<String> JAVA_STRING_TYPE = 
>>>> SimpleType.simpleImmutableTypeFrom(
>>>>          TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"),
>>>>          javaStringSerialize,
>>>>          javaStringDeserialize
>>>>    );
>>>> 
>>>>    public static final Type<JsonNode> JSON_TYPE =
>>>>          SimpleType.simpleImmutableTypeFrom(
>>>>                TypeName.typeNameOf("application", 
>>>> JsonNode.class.getName()),
>>>>                objectMapper::writeValueAsBytes,
>>>>                bytes -> objectMapper.readValue(bytes, JsonNode.class));
>>>> }
>>>> 
>>>> My Undertow Handler, copied from 
>>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
>>>>  
>>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java>
>>>> 
>>>> package statefun.implementation.undertow;
>>>> 
>>>> import io.undertow.server.HttpHandler;
>>>> import io.undertow.server.HttpServerExchange;
>>>> import io.undertow.util.Headers;
>>>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
>>>> import org.apache.flink.statefun.sdk.java.slice.Slice;
>>>> import org.apache.flink.statefun.sdk.java.slice.Slices;
>>>> 
>>>> import java.util.Objects;
>>>> import java.util.concurrent.CompletableFuture;
>>>> 
>>>> /**
>>>>  * A simple Undertow {@link HttpHandler} that delegates requests from 
>>>> StateFun runtime processes to
>>>>  * a StateFun {@link RequestReplyHandler}.
>>>>  */
>>>> public final class UndertowHttpHandler implements HttpHandler {
>>>>   private final RequestReplyHandler handler;
>>>> 
>>>>   public UndertowHttpHandler(RequestReplyHandler handler) {
>>>>     this.handler = Objects.requireNonNull(handler);
>>>>   }
>>>> 
>>>>   @Override
>>>>   public void handleRequest(HttpServerExchange exchange) {
>>>>     exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
>>>>   }
>>>> 
>>>>   private void onRequestBody(HttpServerExchange exchange, byte[] 
>>>> requestBytes) {
>>>>     exchange.dispatch();
>>>>     CompletableFuture<Slice> future = 
>>>> handler.handle(Slices.wrap(requestBytes));
>>>>     future.whenComplete((response, exception) -> onComplete(exchange, 
>>>> response, exception));
>>>>   }
>>>> 
>>>>   private void onComplete(HttpServerExchange exchange, Slice 
>>>> responseBytes, Throwable ex) {
>>>>     if (ex != null) {
>>>>       ex.printStackTrace(System.out);
>>>>       exchange.getResponseHeaders().put(Headers.STATUS, 500);
>>>>       exchange.endExchange();
>>>>       return;
>>>>     }
>>>>     exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, 
>>>> "application/octet-stream");
>>>>     
>>>> exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
>>>>   }
>>>> }
>>>> 
>>>> My App Server copied from 
>>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
>>>>  
>>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java>
>>>> package statefun;
>>>> 
>>>> import statefun.implementation.functions.EchoFn;
>>>> import statefun.implementation.undertow.UndertowHttpHandler;
>>>> import io.undertow.Undertow;
>>>> import org.apache.flink.statefun.sdk.java.StatefulFunctions;
>>>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
>>>> 
>>>> 
>>>> public final class StatefulFunctionsAppServer {
>>>> 
>>>>   public static void main(String[] args) {
>>>>     final StatefulFunctions functions = new StatefulFunctions();
>>>>     functions.withStatefulFunction(EchoFn.SPEC);
>>>> 
>>>>     final RequestReplyHandler requestReplyHandler = 
>>>> functions.requestReplyHandler();
>>>>     final Undertow httpServer =
>>>>         Undertow.builder()
>>>>             .addHttpListener(8080, "0.0.0.0")
>>>>             .setHandler(new UndertowHttpHandler(requestReplyHandler))
>>>>             .build();
>>>>     httpServer.start();
>>>>   }
>>>> }
>>>> 
>>>> 
>>>> However, when I run
>>>> 
>>>> curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' 
>>>> http://localhost:8080/std.fns/echo <http://localhost:8080/std.fns/echo>
>>>> 
>>>> or 
>>>> 
>>>> curl -X POST -H "Content-Type: 
>>>> application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello 
>>>> World"}' http://localhost:8080/std.fns/echo 
>>>> <http://localhost:8080/std.fns/echo> 
>>>> 
>>>> I get the following error on the server.  
>>>> 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException:
>>>>  While parsing a protocol message, the input ended unexpectedly in the 
>>>> middle of a field.  This could mean either that the input has been 
>>>> truncated or that an embedded message misreported its own length.
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666)
>>>>    at 
>>>> org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60)
>>>>    at 
>>>> statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49)
>>>>    at 
>>>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399)
>>>>    at 
>>>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481)
>>>>    at 
>>>> statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44)
>>>>    at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
>>>>    at 
>>>> io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256)
>>>>    at 
>>>> io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136)
>>>>    at 
>>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162)
>>>>    at 
>>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100)
>>>>    at 
>>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57)
>>>>    at 
>>>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
>>>>    at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291)
>>>>    at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286)
>>>>    at 
>>>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
>>>>    at 
>>>> org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178)
>>>>    at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612)
>>>>    at org.xnio.nio.WorkerThread.run(WorkerThread.java:479)
>>>> 
>>>> 
>>>> 
>>>> I've been stuck on this for a very long time. Can somebody please advise 
>>>> me as to what I am doing wrong? The framework never even gets to the point 
>>>> where it parses my payload.
>>>> 
>>>> 
>> 
> 

Reply via email to