In Flink 1.1.1, I am seeing what looks like a serialization issue of
org.apache.hadoop.conf.Configuration or when used with
org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat. When I use the
mapred.HadoopOutputFormat version, it works just fine.
Specifically, the job fails because "java.lang.UnsupportedOperationException:
You must set the ColumnFamily schema using setColumnFamilySchema." I am
definitely setting that property, and it appears to be getting serialized, but
when the config deserializes the setting is gone. Anybody have any ideas? In
the meantime, I will continue using the "mapred" package.
Stack trace:
java.lang.UnsupportedOperationException: You must set the ColumnFamily schema
using setColumnFamilySchema.
at
org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getColumnFamilySchema(CqlBulkOutputFormat.java:184)
at
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.setConfigs(CqlBulkRecordWriter.java:94)
at
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.<init>(CqlBulkRecordWriter.java:74)
at
org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:86)
at
org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:52)
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:146)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Code that works:
val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time,
value) VALUES (?, ?, ?)"
val config = new JobConf()
ConfigHelper.setOutputInitialAddress(config,
initialOutputAddress.getHostAddress)
CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema)
CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily,
insertStmt)
CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
ConfigHelper.setOutputColumnFamily(config,
keyspace,
colFamily)
ConfigHelper.setOutputPartitioner(config, partitionerClass)
val outputFormat = new mapred.HadoopOutputFormat[Object,
java.util.List[ByteBuffer]](
new CqlBulkOutputFormat,
config)
Code that doesn't work:
val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time,
value) VALUES (?, ?, ?)"
val config = new Configuration()
ConfigHelper.setOutputInitialAddress(config,
initialOutputAddress.getHostAddress)
CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema)
CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily,
insertStmt)
CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
ConfigHelper.setOutputColumnFamily(config,
keyspace,
colFamily)
ConfigHelper.setOutputPartitioner(config, partitionerClass)
val hadoopJob: Job = Job.getInstance(config)
val outputFormat = new mapreduce.HadoopOutputFormat[Object,
java.util.List[ByteBuffer]](
new CqlBulkOutputFormat,
hadoopJob)