Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147442488 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -205,12 +207,16 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { * @param <IN> input type * @return CassandraSinkBuilder, to further configure the sink */ - public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) { + public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) { TypeInformation<IN> typeInfo = input.getType(); if (typeInfo instanceof TupleTypeInfo) { - DataStream<T> tupleInput = (DataStream<T>) input; + DataStream<Tuple> tupleInput = (DataStream<Tuple>) input; return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } + if (typeInfo instanceof RowTypeInfo) { --- End diff -- Can check here for a concrete class with `Row.class.equals(typeInfo.getTypeClass())` to also support `GenericType<Row>` as well.
---