Hi Shannon, I tried to reproduce the problem in a unit test without success. My test configures a HadoopOutputFormat object, serializes and deserializes it, cally open, and verifies that a configured String property is present in the getRecordWriter() method.
Next I would try to reproduce the error with Cassandra. Which version are you using? Can you also open a JIRA issue for this bug? Thanks, Fabian 2016-10-11 23:40 GMT+02:00 Shannon Carey <sca...@expedia.com>: > In Flink 1.1.1, I am seeing what looks like a serialization issue of > org.apache.hadoop.conf.Configuration or when used > with org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat. When > I use the mapred.HadoopOutputFormat version, it works just fine. > > Specifically, the job fails because "java.lang.UnsupportedOperationException: > You must set the ColumnFamily schema using setColumnFamilySchema." I am > definitely setting that property, and it appears to be getting serialized, > but when the config deserializes the setting is gone. Anybody have any > ideas? In the meantime, I will continue using the "mapred" package. > > Stack trace: > java.lang.UnsupportedOperationException: You must set the ColumnFamily > schema using setColumnFamilySchema. > at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat. > getColumnFamilySchema(CqlBulkOutputFormat.java:184) > at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.setConfigs( > CqlBulkRecordWriter.java:94) > at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.< > init>(CqlBulkRecordWriter.java:74) > at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter( > CqlBulkOutputFormat.java:86) > at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter( > CqlBulkOutputFormat.java:52) > at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open( > HadoopOutputFormatBase.java:146) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:176) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > Code that works: > > val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, > updated_time, value) VALUES (?, ?, ?)" > val config = new JobConf() > > ConfigHelper.setOutputInitialAddress(config, initialOutputAddress. > getHostAddress) > > CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, > cqlTableSchema) > CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, > insertStmt) > CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true) > ConfigHelper.setOutputColumnFamily(config, > keyspace, > colFamily) > ConfigHelper.setOutputPartitioner(config, partitionerClass) > > val outputFormat = new mapred.HadoopOutputFormat[Object, > java.util.List[ByteBuffer]]( > new CqlBulkOutputFormat, > config) > > Code that doesn't work: > > val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, > updated_time, value) VALUES (?, ?, ?)" > val config = new Configuration() > > ConfigHelper.setOutputInitialAddress(config, initialOutputAddress. > getHostAddress) > > CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, > cqlTableSchema) > CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, > insertStmt) > CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true) > ConfigHelper.setOutputColumnFamily(config, > keyspace, > colFamily) > ConfigHelper.setOutputPartitioner(config, partitionerClass) > > val hadoopJob: Job = Job.getInstance(config) > > val outputFormat = new mapreduce.HadoopOutputFormat[Object, > java.util.List[ByteBuffer]]( > new CqlBulkOutputFormat, > hadoopJob) > >