Hi Till,
Thanks for the information.
1. What do you mean by 'subtask', is it every partition or every message in
the stream?
2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
as I can't use a query when I have a datastream with Pojo?
CassandraSink.addSink(messageStream)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return buildCassandraCluster();
}
})
.build();
Thanks,
On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <[email protected]> wrote:
> Hi Meghashyam,
>
> 1.
>
> You can perform initializations in the open method of the
> RichSinkFunction interface. The open method will be called once for
> every sub task when initializing it. If you want to share the resource
> across multiple sub tasks running in the same JVM you can also store the
> dbSession in a class variable.
> 2.
>
> The Flink community is currently working on adding security support
> including ssl encryption to Flink. So maybe in the future you can use
> Flink’s Cassandra sink again.
>
> Cheers,
> Till
>
>
> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
> [email protected]> wrote:
>
>> Thanks a lot for the quick reply Shannon.
>>
>> 1. I will create a class that extends SinkFunction and write my
>> connection logic there. My only question here is- will a dbSession be
>> created for each message/partition which might affect the performance?
>> Thats the reason why I added this line to create a connection once and use
>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>> = getSession();}
>>
>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
>> C* cluster and I couldn't get it work with all my SSL
>> config(truststore,keystore etc) added to cluster building. I didn't find a
>> proper example with SSL enabled flink-connector-cassandra
>>
>>
>> Thanks
>>
>>
>>
>>
>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <[email protected]>
>> wrote:
>>
>>> You haven't really added a sink in Flink terminology, you're just
>>> performing a side effect within a map operator. So while it may work, if
>>> you want to add a sink proper you need have an object that extends
>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>> ".addSink(…)".
>>>
>>> Also, the dbSession isn't really Flink state as it will not vary based
>>> on the position in or content in the stream. It's a necessary helper
>>> object, yes, but you don't need Flink to checkpoint it.
>>>
>>> You can still use the sinks provided with flink-connector-cassandra and
>>> customize the cluster building by passing your own ClusterBuilder into the
>>> constructor.
>>>
>>> -Shannon
>>>
>>> From: Meghashyam Sandeep V <[email protected]>
>>> Date: Friday, December 9, 2016 at 12:26 PM
>>> To: <[email protected]>, <[email protected]>
>>> Subject: Reg. custom sinks in Flink
>>>
>>> Hi there,
>>>
>>> I have a flink streaming app where my source is Kafka and a custom sink
>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>> customized auth to C*). I'm currently have the following:
>>>
>>> messageStream
>>> .rebalance()
>>>
>>> .map( s-> {
>>>
>>> return mapper.readValue(s, JsonNode.class);)
>>>
>>> .filter(//filter some messages)
>>>
>>> .map(
>>>
>>> (MapFunction<JsonNode, String>) message -> {
>>>
>>> getDbSession.execute("QUERY_TO_EXEC")
>>>
>>> })
>>>
>>> private static Session getDbSession() {
>>> if(dbSession == null && store!=null) {
>>> dbSession = getSession();
>>> }
>>>
>>> return dbSession;
>>> }
>>>
>>> 1. Is this the right way to add a custom sink? As you can see, I have
>>> dbSession as a class variable here and I'm storing its state.
>>>
>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When
>>> I run using flink with YARN on EMR I get a NPE at the session which is kind
>>> of weird.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>