Hello,
the query is generated automatically from the pojo by the datastax
MappingManager in the CassandraPojoSink; Flink isn't generating anything
itself.
On the MappingManager you can set the TTL for all queries (it also
allows some other stuff). So, to allow the user to set the TTL we must
add a hook
to configure the MappingManager; this can be done the same way the
Cluster is configured using the ClusterBuilder.
Regards,
Chesnay
On 12.12.2016 19:12, Meghashyam Sandeep V wrote:
Thank you Till. I wanted to contribute towards Flink. Looks like this
could be a good start. I couldn't find the place where the insert
query is built for Pojo sinks in CassandraSink.java,
CassandraPojoSink.java, or CassandraSinkBase.java. Could you throw
some light about how that insert query is built automatically by the
sink?
Thanks,
On Mon, Dec 12, 2016 at 7:56 AM, Till Rohrmann <trohrm...@apache.org
<mailto:trohrm...@apache.org>> wrote:
(1) A subtask is a parallel instance of an operator and thus
responsible for a partition (possibly infinite) of the whole
DataStream/DataSet.
(2) Maybe you can add this feature to Flink's Cassandra Sink.
Cheers,
Till
On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V
<vr1meghash...@gmail.com <mailto:vr1meghash...@gmail.com>> wrote:
Data piles up in Cassandra without TTL. Is there a workaround
for this problem? Is there a way to specify my query and still
use Pojo?
Thanks,
On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
Regarding 2) I don't think so. That would require access
to the datastax MappingManager.
We could add something similar as the ClusterBuilder for
that though.
Regards,
Chesnay
On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
Hi Till,
Thanks for the information.
1. What do you mean by 'subtask', is it every partition
or every message in the stream?
2. I tried using CassandraSink with a Pojo. Is there a
way to specify TTL as I can't use a query when I have a
datastream with Pojo?
CassandraSink.addSink(messageStream)
.setClusterBuilder(new ClusterBuilder() {
@Override protected Cluster
buildCluster(Cluster.Builder builder) {
return buildCassandraCluster();
}
})
.build();
Thanks,
On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann
<trohrm...@apache.org <mailto:trohrm...@apache.org>> wrote:
Hi Meghashyam,
1.
You can perform initializations in the open
method of the |RichSinkFunction| interface. The
|open| method will be called once for every sub
task when initializing it. If you want to share
the resource across multiple sub tasks running in
the same JVM you can also store the |dbSession|
in a class variable.
2.
The Flink community is currently working on
adding security support including ssl encryption
to Flink. So maybe in the future you can use
Flink’s Cassandra sink again.
Cheers, Till
On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V
<vr1meghash...@gmail.com
<mailto:vr1meghash...@gmail.com>> wrote:
Thanks a lot for the quick reply Shannon.
1. I will create a class that extends
SinkFunction and write my connection logic there.
My only question here is- will a dbSession be
created for each message/partition which might
affect the performance? Thats the reason why I
added this line to create a connection once and
use it along the datastream. if(dbSession == null
&& store!=null) { dbSession = getSession();}
2. I couldn't use flink-connector-cassandra as I
have SSL enabled for my C* cluster and I couldn't
get it work with all my SSL
config(truststore,keystore etc) added to cluster
building. I didn't find a proper example with SSL
enabled flink-connector-cassandra
Thanks
On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey
<sca...@expedia.com <mailto:sca...@expedia.com>>
wrote:
You haven't really added a sink in Flink
terminology, you're just performing a side
effect within a map operator. So while it may
work, if you want to add a sink proper you
need have an object that extends SinkFunction
or RichSinkFunction. The method call on the
stream should be ".addSink(…)".
Also, the dbSession isn't really Flink state
as it will not vary based on the position in
or content in the stream. It's a necessary
helper object, yes, but you don't need Flink
to checkpoint it.
You can still use the sinks provided with
flink-connector-cassandra and customize the
cluster building by passing your
own ClusterBuilder into the constructor.
-Shannon
From: Meghashyam Sandeep V
<vr1meghash...@gmail.com
<mailto:vr1meghash...@gmail.com>> Date:
Friday, December 9, 2016 at 12:26 PM To:
<user@flink.apache.org
<mailto:user@flink.apache.org>>,
<d...@flink.apache.org
<mailto:d...@flink.apache.org>> Subject: Reg.
custom sinks in Flink
Hi there,
I have a flink streaming app where my source
is Kafka and a custom sink to Cassandra(I
can't use standard C* sink that comes with
flink as I have customized auth to C*). I'm
currently have the following:
messageStream
.rebalance()
.map( s-> {
returnmapper.readValue(s, JsonNode.class);)
.filter(//filter some messages)
.map(
(MapFunction<JsonNode, String>) message -> {
getDbSession.execute("QUERY_TO_EXEC")
})
private static Session getDbSession() {
if(dbSession ==null &&store!=null) {
dbSession = getSession();
}
return dbSession;
}
1. Is this the right way to add a custom sink? As you
can see, I have dbSession as a class variable here and I'm storing its state.
2. This setup works fine in a standalone flink (java
-jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the
session which is kind of weird.
Thanks