Hi Till,

Thanks for the information.

1. What do you mean by 'subtask', is it every partition or every message in
the stream?

2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
as I can't use a query when I have a datastream with Pojo?

         .setClusterBuilder(new ClusterBuilder() {
             protected Cluster buildCluster(Cluster.Builder builder) {
                 return buildCassandraCluster();


On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Meghashyam,
>    1.
>    You can perform initializations in the open method of the
>    RichSinkFunction interface. The open method will be called once for
>    every sub task when initializing it. If you want to share the resource
>    across multiple sub tasks running in the same JVM you can also store the
>    dbSession in a class variable.
>    2.
>    The Flink community is currently working on adding security support
>    including ssl encryption to Flink. So maybe in the future you can use
>    Flink’s Cassandra sink again.
> Cheers,
> Till
> ​
> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
> vr1meghash...@gmail.com> wrote:
>> Thanks a lot for the quick reply Shannon.
>> 1. I will create a class that extends SinkFunction and write my
>> connection logic there. My only question here is- will a dbSession be
>> created for each message/partition which might affect the performance?
>> Thats the reason why I added this line to create a connection once and use
>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>> = getSession();}
>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
>> C* cluster and I couldn't get it work with all my SSL
>> config(truststore,keystore etc) added to cluster building. I didn't find a
>> proper example with SSL enabled flink-connector-cassandra
>> Thanks
>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sca...@expedia.com>
>> wrote:
>>> You haven't really added a sink in Flink terminology, you're just
>>> performing a side effect within a map operator. So while it may work, if
>>> you want to add a sink proper you need have an object that extends
>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>> ".addSink(…)".
>>> Also, the dbSession isn't really Flink state as it will not vary based
>>> on the position in or content in the stream. It's a necessary helper
>>> object, yes, but you don't need Flink to checkpoint it.
>>> You can still use the sinks provided with flink-connector-cassandra and
>>> customize the cluster building by passing your own ClusterBuilder into the
>>> constructor.
>>> -Shannon
>>> From: Meghashyam Sandeep V <vr1meghash...@gmail.com>
>>> Date: Friday, December 9, 2016 at 12:26 PM
>>> To: <u...@flink.apache.org>, <dev@flink.apache.org>
>>> Subject: Reg. custom sinks in Flink
>>> Hi there,
>>> I have a flink streaming app where my source is Kafka and a custom sink
>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>> customized auth to C*). I'm currently have the following:
>>> messageStream
>>>         .rebalance()
>>>         .map( s-> {
>>>     return mapper.readValue(s, JsonNode.class);)
>>>         .filter(//filter some messages)
>>>         .map(
>>>          (MapFunction<JsonNode, String>) message -> {
>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>          })
>>> private static Session getDbSession() {
>>>     if(dbSession == null && store!=null) {
>>>         dbSession = getSession();
>>>     }
>>>     return dbSession;
>>> }
>>> 1. Is this the right way to add a custom sink? As you can see, I have 
>>> dbSession as a class variable here and I'm storing its state.
>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When 
>>> I run using flink with YARN on EMR I get a NPE at the session which is kind 
>>> of weird.
>>> Thanks

Reply via email to