Hi Meghashyam, 1.
You can perform initializations in the open method of the RichSinkFunction interface. The open method will be called once for every sub task when initializing it. If you want to share the resource across multiple sub tasks running in the same JVM you can also store the dbSession in a class variable. 2. The Flink community is currently working on adding security support including ssl encryption to Flink. So maybe in the future you can use Flink’s Cassandra sink again. Cheers, Till On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V < vr1meghash...@gmail.com> wrote: > Thanks a lot for the quick reply Shannon. > > 1. I will create a class that extends SinkFunction and write my connection > logic there. My only question here is- will a dbSession be created for each > message/partition which might affect the performance? Thats the reason why > I added this line to create a connection once and use it along the > datastream. if(dbSession == null && store!=null) { dbSession = > getSession();} > > 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my > C* cluster and I couldn't get it work with all my SSL > config(truststore,keystore etc) added to cluster building. I didn't find a > proper example with SSL enabled flink-connector-cassandra > > > Thanks > > > > > On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sca...@expedia.com> wrote: > >> You haven't really added a sink in Flink terminology, you're just >> performing a side effect within a map operator. So while it may work, if >> you want to add a sink proper you need have an object that extends >> SinkFunction or RichSinkFunction. The method call on the stream should be >> ".addSink(…)". >> >> Also, the dbSession isn't really Flink state as it will not vary based on >> the position in or content in the stream. It's a necessary helper object, >> yes, but you don't need Flink to checkpoint it. >> >> You can still use the sinks provided with flink-connector-cassandra and >> customize the cluster building by passing your own ClusterBuilder into the >> constructor. >> >> -Shannon >> >> From: Meghashyam Sandeep V <vr1meghash...@gmail.com> >> Date: Friday, December 9, 2016 at 12:26 PM >> To: <u...@flink.apache.org>, <dev@flink.apache.org> >> Subject: Reg. custom sinks in Flink >> >> Hi there, >> >> I have a flink streaming app where my source is Kafka and a custom sink >> to Cassandra(I can't use standard C* sink that comes with flink as I have >> customized auth to C*). I'm currently have the following: >> >> messageStream >> .rebalance() >> >> .map( s-> { >> >> return mapper.readValue(s, JsonNode.class);) >> >> .filter(//filter some messages) >> >> .map( >> >> (MapFunction<JsonNode, String>) message -> { >> >> getDbSession.execute("QUERY_TO_EXEC") >> >> }) >> >> private static Session getDbSession() { >> if(dbSession == null && store!=null) { >> dbSession = getSession(); >> } >> >> return dbSession; >> } >> >> 1. Is this the right way to add a custom sink? As you can see, I have >> dbSession as a class variable here and I'm storing its state. >> >> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I >> run using flink with YARN on EMR I get a NPE at the session which is kind of >> weird. >> >> >> Thanks >> >> >