I'm trying to write very simple echo app with Stateful Function to prove it as 
a technology for some of my use cases.

I have not been able to accept different content types though.  Here is an 
example of my code for a simple
echo function:

My Echo stateful function class.

package statefun.implementation.functions;

import statefun.api.annotations.Statefun;
import statefun.api.factories.StatefulFunctionSpecFactory;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

@Statefun
public class EchoFn implements StatefulFunction, StatefulFunctionSpecFactory {

   private final static Logger logger = LoggerFactory.getLogger(EchoFn.class);

   private final static TypeName fnType = TypeName.typeNameOf("std.fns", 
"echo");

   public final static StatefulFunctionSpec SPEC =
         StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build();

   @Override
   public CompletableFuture<Void> apply(Context context, Message message) 
throws Throwable {

      if (message.is(Types.JAVA_STRING_TYPE)) {
         final String request = message.as(Types.JAVA_STRING_TYPE);
         
context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
      } else if (message.is(Types.JSON_TYPE)) {
         final JsonNode json = message.as(Types.JSON_TYPE);
         final String request = json.asText();
         
context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
      }

      return context.done();
   }

   @Override
   public StatefulFunctionSpec createSpec(Supplier<? extends StatefulFunction> 
supplier) {
      return StatefulFunctionSpec.builder(EchoFn.fnType)
            .withSupplier(supplier)
            .build();
   }

   @Override
   public String toString() {
      return fnType.toString();
   }
}
My Types class, which kind of copies 
https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java

package statefun.implementation.functions;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.types.SimpleType;
import org.apache.flink.statefun.sdk.java.types.Type;

public class Types {

   public static final String JAVA_LANG_NAMESPACE = "java.lang";
   private final static ObjectMapper objectMapper = new ObjectMapper();
   public static final SimpleType.Fn<String, byte[]> javaStringSerialize = 
input -> input == null ? new byte[0] : input.getBytes();
   public static final SimpleType.Fn<byte[], String> javaStringDeserialize = 
input -> input == null || input.length == 0 ? null : new String(input);
   
   public static final Type<String> JAVA_STRING_TYPE = 
SimpleType.simpleImmutableTypeFrom(
         TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"),
         javaStringSerialize,
         javaStringDeserialize
   );

   public static final Type<JsonNode> JSON_TYPE =
         SimpleType.simpleImmutableTypeFrom(
               TypeName.typeNameOf("application", JsonNode.class.getName()),
               objectMapper::writeValueAsBytes,
               bytes -> objectMapper.readValue(bytes, JsonNode.class));
}

My Undertow Handler, copied from 
https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java

package statefun.implementation.undertow;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
import org.apache.flink.statefun.sdk.java.slice.Slice;
import org.apache.flink.statefun.sdk.java.slice.Slices;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
 * A simple Undertow {@link HttpHandler} that delegates requests from StateFun 
runtime processes to
 * a StateFun {@link RequestReplyHandler}.
 */
public final class UndertowHttpHandler implements HttpHandler {
  private final RequestReplyHandler handler;

  public UndertowHttpHandler(RequestReplyHandler handler) {
    this.handler = Objects.requireNonNull(handler);
  }

  @Override
  public void handleRequest(HttpServerExchange exchange) {
    exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
  }

  private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes) {
    exchange.dispatch();
    CompletableFuture<Slice> future = handler.handle(Slices.wrap(requestBytes));
    future.whenComplete((response, exception) -> onComplete(exchange, response, 
exception));
  }

  private void onComplete(HttpServerExchange exchange, Slice responseBytes, 
Throwable ex) {
    if (ex != null) {
      ex.printStackTrace(System.out);
      exchange.getResponseHeaders().put(Headers.STATUS, 500);
      exchange.endExchange();
      return;
    }
    exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, 
"application/octet-stream");
    exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
  }
}

My App Server copied from 
https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
package statefun;

import statefun.implementation.functions.EchoFn;
import statefun.implementation.undertow.UndertowHttpHandler;
import io.undertow.Undertow;
import org.apache.flink.statefun.sdk.java.StatefulFunctions;
import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;


public final class StatefulFunctionsAppServer {

  public static void main(String[] args) {
    final StatefulFunctions functions = new StatefulFunctions();
    functions.withStatefulFunction(EchoFn.SPEC);

    final RequestReplyHandler requestReplyHandler = 
functions.requestReplyHandler();
    final Undertow httpServer =
        Undertow.builder()
            .addHttpListener(8080, "0.0.0.0")
            .setHandler(new UndertowHttpHandler(requestReplyHandler))
            .build();
    httpServer.start();
  }
}


However, when I run

curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' 
http://localhost:8080/std.fns/echo

or 

curl -X POST -H "Content-Type: 
application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello 
World"}' http://localhost:8080/std.fns/echo 

I get the following error on the server.  

org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException:
 While parsing a protocol message, the input ended unexpectedly in the middle 
of a field.  This could mean either that the input has been truncated or that 
an embedded message misreported its own length.
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298)
        at 
org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65)
        at 
org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14)
        at 
org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090)
        at 
org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125)
        at 
org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
        at 
org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666)
        at 
org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60)
        at 
statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49)
        at 
io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399)
        at 
io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481)
        at 
statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44)
        at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
        at 
io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256)
        at 
io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136)
        at 
io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162)
        at 
io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100)
        at 
io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57)
        at 
org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
        at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291)
        at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286)
        at 
org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
        at 
org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178)
        at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612)
        at org.xnio.nio.WorkerThread.run(WorkerThread.java:479)



I've been stuck on this for a very long time. Can somebody please advise me as 
to what I am doing wrong? The framework never even gets to the point where it 
parses my payload.


Reply via email to