Hi Till, Thanks for the information.
1. What do you mean by 'subtask', is it every partition or every message in the stream? 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL as I can't use a query when I have a datastream with Pojo? CassandraSink.addSink(messageStream) .setClusterBuilder(new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return buildCassandraCluster(); } }) .build(); Thanks, On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Meghashyam, > > 1. > > You can perform initializations in the open method of the > RichSinkFunction interface. The open method will be called once for > every sub task when initializing it. If you want to share the resource > across multiple sub tasks running in the same JVM you can also store the > dbSession in a class variable. > 2. > > The Flink community is currently working on adding security support > including ssl encryption to Flink. So maybe in the future you can use > Flink’s Cassandra sink again. > > Cheers, > Till > > > On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V < > vr1meghash...@gmail.com> wrote: > >> Thanks a lot for the quick reply Shannon. >> >> 1. I will create a class that extends SinkFunction and write my >> connection logic there. My only question here is- will a dbSession be >> created for each message/partition which might affect the performance? >> Thats the reason why I added this line to create a connection once and use >> it along the datastream. if(dbSession == null && store!=null) { dbSession >> = getSession();} >> >> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my >> C* cluster and I couldn't get it work with all my SSL >> config(truststore,keystore etc) added to cluster building. I didn't find a >> proper example with SSL enabled flink-connector-cassandra >> >> >> Thanks >> >> >> >> >> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sca...@expedia.com> >> wrote: >> >>> You haven't really added a sink in Flink terminology, you're just >>> performing a side effect within a map operator. So while it may work, if >>> you want to add a sink proper you need have an object that extends >>> SinkFunction or RichSinkFunction. The method call on the stream should be >>> ".addSink(…)". >>> >>> Also, the dbSession isn't really Flink state as it will not vary based >>> on the position in or content in the stream. It's a necessary helper >>> object, yes, but you don't need Flink to checkpoint it. >>> >>> You can still use the sinks provided with flink-connector-cassandra and >>> customize the cluster building by passing your own ClusterBuilder into the >>> constructor. >>> >>> -Shannon >>> >>> From: Meghashyam Sandeep V <vr1meghash...@gmail.com> >>> Date: Friday, December 9, 2016 at 12:26 PM >>> To: <u...@flink.apache.org>, <dev@flink.apache.org> >>> Subject: Reg. custom sinks in Flink >>> >>> Hi there, >>> >>> I have a flink streaming app where my source is Kafka and a custom sink >>> to Cassandra(I can't use standard C* sink that comes with flink as I have >>> customized auth to C*). I'm currently have the following: >>> >>> messageStream >>> .rebalance() >>> >>> .map( s-> { >>> >>> return mapper.readValue(s, JsonNode.class);) >>> >>> .filter(//filter some messages) >>> >>> .map( >>> >>> (MapFunction<JsonNode, String>) message -> { >>> >>> getDbSession.execute("QUERY_TO_EXEC") >>> >>> }) >>> >>> private static Session getDbSession() { >>> if(dbSession == null && store!=null) { >>> dbSession = getSession(); >>> } >>> >>> return dbSession; >>> } >>> >>> 1. Is this the right way to add a custom sink? As you can see, I have >>> dbSession as a class variable here and I'm storing its state. >>> >>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When >>> I run using flink with YARN on EMR I get a NPE at the session which is kind >>> of weird. >>> >>> >>> Thanks >>> >>> >> >