Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to
Cassandra(I can't use standard C* sink that comes with flink as I have
customized auth to C*). I'm currently have the following:

messageStream
        .rebalance()

        .map( s-> {

    return mapper.readValue(s, JsonNode.class);)

        .filter(//filter some messages)

        .map(

         (MapFunction<JsonNode, String>) message -> {

         getDbSession.execute("QUERY_TO_EXEC")

         })

private static Session getDbSession() {
    if(dbSession == null && store!=null) {
        dbSession = getSession();
    }

    return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have
dbSession as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar).
When I run using flink with YARN on EMR I get a NPE at the session
which is kind of weird.


Thanks

Reply via email to