I found the reason: There is a class incompatibility because I changed from
    Statefun 2.2.1 + Flink 1.11.1
to
    Statefun 2.2.1 + Flink 1.12.0

But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.

Is there a possibility to use the newest version of Flink in combination with 
the newest version of Statefun? I'm wondering why there is no Statefun version 
matching the current stable version of Flink?

Stephan


Von: Stephan Pelikan <stephan.peli...@phactum.at>
Gesendet: Montag, 11. Jänner 2021 19:37
An: user@flink.apache.org
Betreff: Statefun with RabbitMQ consumes message but does not run statefun

Hi,

I try to use RabbitMQ as a Source. My source consumes messages of the queue but 
the statefun is not execution - not even created.

This is my main function:

1 public static void main(String[] args) throws Exception {
2
3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();
4
5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);
6
7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
10     
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
11
12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
13     statefunConfig.setFlinkJobName("test");
14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
15
16     final var connectionConfig = new RMQConnectionConfig.Builder()
17             .setHost("localhost")
18             .setUserName("guest")
19             .setPassword("guest")
20             .setPort(5672)
21             .setVirtualHost("test")
22             .setPrefetchCount(5000)
23             .build();
24
25     final var deserializationSchema = new 
TypeInformationSerializationSchema<>(
26             new ProtobufTypeInformation<>(Any.class), env.getConfig());
27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, 
true, deserializationSchema);
28
29     final var source = env
30             .addSource(rmqSource, TEST_INGRESS)
31             .setParallelism(1)
32             .map(msg -> {
33                 return RoutableMessageBuilder
34                     .builder()
35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
36                     .withMessageBody(msg)
37                     .build();
38             });
39
40     StatefulFunctionDataStreamBuilder
41             .builder("test")
42             .withDataStreamAsIngress(source)
43             .withFunctionProvider(MyStatefun.TYPE, unused -> {
44                 return new MyStatefun();
45             })
46             .withEgressId(MyStatefun.EGRESS)
47             .withConfiguration(statefunConfig)
48             .build(env)
49             .getDataStreamForEgressId(MyStatefun.EGRESS)
50             .addSink(new PrintSinkFunction<>(true));
51
52     env.execute();
53
54 }

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 
is never called. The message is reportingly consumed but never acknowledged or 
processed. Before using RabbitMQ I used a custom SourceFunction to fake input 
data and it worked well.

To setup things I use a local environment but logging does not show up any 
errors. Before my current problem I had another error during message 
deserialization and it wasn't reported either. Unfortunately I didn't manage to 
get the exception in the log/stdout. I had to use the debugger to find the 
reason of the former problem. In this situation now the debugger shows no 
thrown or caught exceptions. That's way I stuck.

Of course I would like to know what's the problem with my code. But I guess it 
is not obviously. Maybe some can give me a hint how to turn on exception 
logging which might help to get closer to the origin of the phenomenon.

Thanks in advance,
Stephan

Reply via email to