IK 

If what you're saying is true, then why do most of the examples in the 
flink-statefun-playground example use HTTP as an alternative entry point?

Here is the greeter example:

https://github.com/apache/flink-statefun-playground/tree/main/java/greeter 
<https://github.com/apache/flink-statefun-playground/tree/main/java/greeter>



> On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com> wrote:
> 
> Hi Marco, 
> The problem is that you’re trying to call the function directly via HTTP. 
> This is not how it's supposed to work. Instead, check out how to define your 
> statefun module 
> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
>  
> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/>
>  and how to deploy Stateful Functions 
> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview
>  
> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview>.
>  The Stateful Functions application consists of Flink Cluster, ingresses to 
> consume incoming data, egresses to produce outgoing data and the HTTP server 
> that serves remote Stateful Functions. Also you can checkout the full example 
> here 
> https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736
>  
> <https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736>
> 
> Best,
> Tymur Yarosh
> 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com>, 
> писав(-ла):
>> I'm trying to write very simple echo app with Stateful Function to prove it 
>> as a technology for some of my use cases.
>> 
>> I have not been able to accept different content types though.  Here is an 
>> example of my code for a simple
>> echo function:
>> 
>> My Echo stateful function class.
>> 
>> package statefun.implementation.functions;
>> 
>> import statefun.api.annotations.Statefun;
>> import statefun.api.factories.StatefulFunctionSpecFactory;
>> import com.fasterxml.jackson.databind.JsonNode;
>> import org.apache.flink.statefun.sdk.java.Context;
>> import org.apache.flink.statefun.sdk.java.StatefulFunction;
>> import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
>> import org.apache.flink.statefun.sdk.java.TypeName;
>> import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
>> import org.apache.flink.statefun.sdk.java.message.Message;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> import java.util.concurrent.CompletableFuture;
>> import java.util.function.Supplier;
>> 
>> @Statefun
>> public class EchoFn implements StatefulFunction, StatefulFunctionSpecFactory 
>> {
>> 
>>    private final static Logger logger = 
>> LoggerFactory.getLogger(EchoFn.class);
>> 
>>    private final static TypeName fnType = TypeName.typeNameOf("std.fns", 
>> "echo");
>> 
>>    public final static StatefulFunctionSpec SPEC =
>>          
>> StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build();
>> 
>>    @Override
>>    public CompletableFuture<Void> apply(Context context, Message message) 
>> throws Throwable {
>> 
>>       if (message.is(Types.JAVA_STRING_TYPE)) {
>>          final String request = message.as(Types.JAVA_STRING_TYPE);
>>          
>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
>>       } else if (message.is(Types.JSON_TYPE)) {
>>          final JsonNode json = message.as(Types.JSON_TYPE);
>>          final String request = json.asText();
>>          
>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
>>       }
>> 
>>       return context.done();
>>    }
>> 
>>    @Override
>>    public StatefulFunctionSpec createSpec(Supplier<? extends 
>> StatefulFunction> supplier) {
>>       return StatefulFunctionSpec.builder(EchoFn.fnType)
>>             .withSupplier(supplier)
>>             .build();
>>    }
>> 
>>    @Override
>>    public String toString() {
>>       return fnType.toString();
>>    }
>> }
>> My Types class, which kind of copies 
>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
>>  
>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java>
>> 
>> package statefun.implementation.functions;
>> 
>> import com.fasterxml.jackson.databind.JsonNode;
>> import com.fasterxml.jackson.databind.ObjectMapper;
>> import org.apache.flink.statefun.sdk.java.TypeName;
>> import org.apache.flink.statefun.sdk.java.types.SimpleType;
>> import org.apache.flink.statefun.sdk.java.types.Type;
>> 
>> public class Types {
>> 
>>    public static final String JAVA_LANG_NAMESPACE = "java.lang";
>>    private final static ObjectMapper objectMapper = new ObjectMapper();
>>    public static final SimpleType.Fn<String, byte[]> javaStringSerialize = 
>> input -> input == null ? new byte[0] : input.getBytes();
>>    public static final SimpleType.Fn<byte[], String> javaStringDeserialize = 
>> input -> input == null || input.length == 0 ? null : new String(input);
>>    
>>    public static final Type<String> JAVA_STRING_TYPE = 
>> SimpleType.simpleImmutableTypeFrom(
>>          TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"),
>>          javaStringSerialize,
>>          javaStringDeserialize
>>    );
>> 
>>    public static final Type<JsonNode> JSON_TYPE =
>>          SimpleType.simpleImmutableTypeFrom(
>>                TypeName.typeNameOf("application", JsonNode.class.getName()),
>>                objectMapper::writeValueAsBytes,
>>                bytes -> objectMapper.readValue(bytes, JsonNode.class));
>> }
>> 
>> My Undertow Handler, copied from 
>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
>>  
>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java>
>> 
>> package statefun.implementation.undertow;
>> 
>> import io.undertow.server.HttpHandler;
>> import io.undertow.server.HttpServerExchange;
>> import io.undertow.util.Headers;
>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
>> import org.apache.flink.statefun.sdk.java.slice.Slice;
>> import org.apache.flink.statefun.sdk.java.slice.Slices;
>> 
>> import java.util.Objects;
>> import java.util.concurrent.CompletableFuture;
>> 
>> /**
>>  * A simple Undertow {@link HttpHandler} that delegates requests from 
>> StateFun runtime processes to
>>  * a StateFun {@link RequestReplyHandler}.
>>  */
>> public final class UndertowHttpHandler implements HttpHandler {
>>   private final RequestReplyHandler handler;
>> 
>>   public UndertowHttpHandler(RequestReplyHandler handler) {
>>     this.handler = Objects.requireNonNull(handler);
>>   }
>> 
>>   @Override
>>   public void handleRequest(HttpServerExchange exchange) {
>>     exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
>>   }
>> 
>>   private void onRequestBody(HttpServerExchange exchange, byte[] 
>> requestBytes) {
>>     exchange.dispatch();
>>     CompletableFuture<Slice> future = 
>> handler.handle(Slices.wrap(requestBytes));
>>     future.whenComplete((response, exception) -> onComplete(exchange, 
>> response, exception));
>>   }
>> 
>>   private void onComplete(HttpServerExchange exchange, Slice responseBytes, 
>> Throwable ex) {
>>     if (ex != null) {
>>       ex.printStackTrace(System.out);
>>       exchange.getResponseHeaders().put(Headers.STATUS, 500);
>>       exchange.endExchange();
>>       return;
>>     }
>>     exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, 
>> "application/octet-stream");
>>     exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
>>   }
>> }
>> 
>> My App Server copied from 
>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
>>  
>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java>
>> package statefun;
>> 
>> import statefun.implementation.functions.EchoFn;
>> import statefun.implementation.undertow.UndertowHttpHandler;
>> import io.undertow.Undertow;
>> import org.apache.flink.statefun.sdk.java.StatefulFunctions;
>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
>> 
>> 
>> public final class StatefulFunctionsAppServer {
>> 
>>   public static void main(String[] args) {
>>     final StatefulFunctions functions = new StatefulFunctions();
>>     functions.withStatefulFunction(EchoFn.SPEC);
>> 
>>     final RequestReplyHandler requestReplyHandler = 
>> functions.requestReplyHandler();
>>     final Undertow httpServer =
>>         Undertow.builder()
>>             .addHttpListener(8080, "0.0.0.0")
>>             .setHandler(new UndertowHttpHandler(requestReplyHandler))
>>             .build();
>>     httpServer.start();
>>   }
>> }
>> 
>> 
>> However, when I run
>> 
>> curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' 
>> http://localhost:8080/std.fns/echo <http://localhost:8080/std.fns/echo>
>> 
>> or 
>> 
>> curl -X POST -H "Content-Type: 
>> application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello 
>> World"}' http://localhost:8080/std.fns/echo 
>> <http://localhost:8080/std.fns/echo> 
>> 
>> I get the following error on the server.  
>> 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException:
>>  While parsing a protocol message, the input ended unexpectedly in the 
>> middle of a field.  This could mean either that the input has been truncated 
>> or that an embedded message misreported its own length.
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298)
>>      at 
>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65)
>>      at 
>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14)
>>      at 
>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090)
>>      at 
>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125)
>>      at 
>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
>>      at 
>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666)
>>      at 
>> org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60)
>>      at 
>> statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49)
>>      at 
>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399)
>>      at 
>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481)
>>      at 
>> statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44)
>>      at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
>>      at 
>> io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256)
>>      at 
>> io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136)
>>      at 
>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162)
>>      at 
>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100)
>>      at 
>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57)
>>      at 
>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
>>      at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291)
>>      at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286)
>>      at 
>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
>>      at 
>> org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178)
>>      at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612)
>>      at org.xnio.nio.WorkerThread.run(WorkerThread.java:479)
>> 
>> 
>> 
>> I've been stuck on this for a very long time. Can somebody please advise me 
>> as to what I am doing wrong? The framework never even gets to the point 
>> where it parses my payload.
>> 
>> 

Reply via email to