Hello,

the query is generated automatically from the pojo by the datastax MappingManager in the CassandraPojoSink; Flink isn't generating anything itself.

On the MappingManager you can set the TTL for all queries (it also allows some other stuff). So, to allow the user to set the TTL we must add a hook to configure the MappingManager; this can be done the same way the Cluster is configured using the ClusterBuilder.

Regards,
Chesnay

On 12.12.2016 19:12, Meghashyam Sandeep V wrote:
Thank you Till. I wanted to contribute towards Flink. Looks like this could be a good start. I couldn't find the place where the insert query is built for Pojo sinks in CassandraSink.java, CassandraPojoSink.java, or CassandraSinkBase.java. Could you throw some light about how that insert query is built automatically by the sink?

Thanks,

On Mon, Dec 12, 2016 at 7:56 AM, Till Rohrmann <trohrm...@apache.org <mailto:trohrm...@apache.org>> wrote:

    (1) A subtask is a parallel instance of an operator and thus
    responsible for a partition (possibly infinite) of the whole
    DataStream/DataSet.

    (2) Maybe you can add this feature to Flink's Cassandra Sink.

    Cheers,
    Till

    On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V
    <vr1meghash...@gmail.com <mailto:vr1meghash...@gmail.com>> wrote:

        Data piles up in Cassandra without TTL. Is there a workaround
        for this problem? Is there a way to specify my query and still
        use Pojo?

        Thanks,

        On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

            Regarding 2) I don't think so. That would require access
            to the datastax MappingManager.
            We could add something similar as the ClusterBuilder for
            that though.

            Regards,
            Chesnay


            On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
            Hi Till,

            Thanks for the information.

            1. What do you mean by 'subtask', is it every partition
            or every message in the stream?

            2. I tried using CassandraSink with a Pojo. Is there a
            way to specify TTL as I can't use a query when I have a
            datastream with Pojo?

            CassandraSink.addSink(messageStream)
                      .setClusterBuilder(new ClusterBuilder() {
                          @Override protected Cluster 
buildCluster(Cluster.Builder builder) {
                              return buildCassandraCluster();
                          }
                      })
                      .build();
            Thanks,
            On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann
            <trohrm...@apache.org <mailto:trohrm...@apache.org>> wrote:

                Hi Meghashyam,

                1.

                    You can perform initializations in the open
                    method of the |RichSinkFunction| interface. The
                    |open| method will be called once for every sub
                    task when initializing it. If you want to share
                    the resource across multiple sub tasks running in
                    the same JVM you can also store the |dbSession|
                    in a class variable.

                2.

                    The Flink community is currently working on
                    adding security support including ssl encryption
                    to Flink. So maybe in the future you can use
                    Flink’s Cassandra sink again.

                Cheers, Till

                ​
                On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V
                <vr1meghash...@gmail.com
                <mailto:vr1meghash...@gmail.com>> wrote:

                    Thanks a lot for the quick reply Shannon.
                    1. I will create a class that extends
                    SinkFunction and write my connection logic there.
                    My only question here is- will a dbSession be
                    created for each message/partition which might
                    affect the performance? Thats the reason why I
                    added this line to create a connection once and
                    use it along the datastream. if(dbSession == null
                    && store!=null) { dbSession = getSession();}
                    2. I couldn't use flink-connector-cassandra as I
                    have SSL enabled for my C* cluster and I couldn't
                    get it work with all my SSL
                    config(truststore,keystore etc) added to cluster
                    building. I didn't find a proper example with SSL
                    enabled flink-connector-cassandra
                    Thanks
                    On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey
                    <sca...@expedia.com <mailto:sca...@expedia.com>>
                    wrote:

                        You haven't really added a sink in Flink
                        terminology, you're just performing a side
                        effect within a map operator. So while it may
                        work, if you want to add a sink proper you
                        need have an object that extends SinkFunction
                        or RichSinkFunction. The method call on the
                        stream should be ".addSink(…)".
                        Also, the dbSession isn't really Flink state
                        as it will not vary based on the position in
                        or content in the stream. It's a necessary
                        helper object, yes, but you don't need Flink
                        to checkpoint it.
                        You can still use the sinks provided with
                        flink-connector-cassandra and customize the
                        cluster building by passing your
                        own ClusterBuilder into the constructor.
                        -Shannon
                        From: Meghashyam Sandeep V
                        <vr1meghash...@gmail.com
                        <mailto:vr1meghash...@gmail.com>> Date:
                        Friday, December 9, 2016 at 12:26 PM To:
                        <user@flink.apache.org
                        <mailto:user@flink.apache.org>>,
                        <d...@flink.apache.org
                        <mailto:d...@flink.apache.org>> Subject: Reg.
                        custom sinks in Flink
                        Hi there,
                        I have a flink streaming app where my source
                        is Kafka and a custom sink to Cassandra(I
                        can't use standard C* sink that comes with
                        flink as I have customized auth to C*). I'm
                        currently have the following:

                        messageStream
                                 .rebalance()

                                 .map( s-> {

                             returnmapper.readValue(s, JsonNode.class);)

                                 .filter(//filter some messages)

                                 .map(

                                  (MapFunction<JsonNode, String>) message -> {

                                  getDbSession.execute("QUERY_TO_EXEC")

                                  })

                        private static Session getDbSession() {
                             if(dbSession ==null &&store!=null) {
                                 dbSession = getSession();
                             }

                             return dbSession;
                        }

                        1. Is this the right way to add a custom sink? As you 
can see, I have dbSession as a class variable here and I'm storing its state.

                        2. This setup works fine in a standalone flink (java 
-jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the 
session which is kind of weird.

                        Thanks

Reply via email to