If I see this correctly in the code, the CassandraSink is using the value of its input stream automatically, so in your case Tuple2<String, Tuple6<String, String, Date, String, String, Double>>
What you want is it to use only Tuple6<String, String, Date, String, String, Double> without the first first part of Tuple2 (probably your key?). A simple map function before adding the sink should to the trick: DataStream<Tuple2<String, Tuple6<String, String, Date, String, String, Double>>> dataStream = //... dataStream.map(new MapFunction<Tuple2<String, Tuple6<String, String, Date, String, String, Double>>, Tuple6<String, String, Date, String, String, Double>>() { @Override public Tuple6<String, String, Date, String, String, Double> map(Tuple2<String, Tuple6<String, String, Date, String, String, Double>> value) throws Exception { return value.f1; } }); Nico On Sunday, 13 August 2017 19:23:54 CEST AndreaKinn wrote: > Ok, this is my situation: > > I have a stream of Tuple2<String, Tuple6<String, String, Date, String, > String, Double>> > > the cassandra code: > > CassandraSink.addSink(stream) > .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" > + " (user, sensor, timestamp, json_ld, observed_value, > value)" > + " VALUES (?, ?, ?, ?, ?, ?);") > .setClusterBuilder(new ClusterBuilder() { > @Override > public Cluster buildCluster(Cluster.Builder builder) { > return builder.addContactPoint("127.0.0.1").build(); > } > }) > .build(); > > the values to insert in VALUES clause are exactly the values of Tuple6. How > can I indicate that to my statement ? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing > -on-Cassandra-tp14744p14867.html Sent from the Apache Flink User Mailing > List archive. mailing list archive at Nabble.com.
signature.asc
Description: This is a digitally signed message part.