In Flink 1.1.1, I am seeing what looks like a serialization issue of org.apache.hadoop.conf.Configuration or when used with org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat. When I use the mapred.HadoopOutputFormat version, it works just fine.
Specifically, the job fails because "java.lang.UnsupportedOperationException: You must set the ColumnFamily schema using setColumnFamilySchema." I am definitely setting that property, and it appears to be getting serialized, but when the config deserializes the setting is gone. Anybody have any ideas? In the meantime, I will continue using the "mapred" package. Stack trace: java.lang.UnsupportedOperationException: You must set the ColumnFamily schema using setColumnFamilySchema. at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getColumnFamilySchema(CqlBulkOutputFormat.java:184) at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.setConfigs(CqlBulkRecordWriter.java:94) at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.<init>(CqlBulkRecordWriter.java:74) at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:86) at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:52) at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:146) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Code that works: val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time, value) VALUES (?, ?, ?)" val config = new JobConf() ConfigHelper.setOutputInitialAddress(config, initialOutputAddress.getHostAddress) CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema) CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, insertStmt) CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true) ConfigHelper.setOutputColumnFamily(config, keyspace, colFamily) ConfigHelper.setOutputPartitioner(config, partitionerClass) val outputFormat = new mapred.HadoopOutputFormat[Object, java.util.List[ByteBuffer]]( new CqlBulkOutputFormat, config) Code that doesn't work: val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time, value) VALUES (?, ?, ?)" val config = new Configuration() ConfigHelper.setOutputInitialAddress(config, initialOutputAddress.getHostAddress) CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema) CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, insertStmt) CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true) ConfigHelper.setOutputColumnFamily(config, keyspace, colFamily) ConfigHelper.setOutputPartitioner(config, partitionerClass) val hadoopJob: Job = Job.getInstance(config) val outputFormat = new mapreduce.HadoopOutputFormat[Object, java.util.List[ByteBuffer]]( new CqlBulkOutputFormat, hadoopJob)