Hi there, I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't use standard C* sink that comes with flink as I have customized auth to C*). I'm currently have the following:
messageStream .rebalance() .map( s-> { return mapper.readValue(s, JsonNode.class);) .filter(//filter some messages) .map( (MapFunction<JsonNode, String>) message -> { getDbSession.execute("QUERY_TO_EXEC") }) private static Session getDbSession() { if(dbSession == null && store!=null) { dbSession = getSession(); } return dbSession; } 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state. 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird. Thanks