Hi, could you elaborate, please? Marking conf, connection and table as transient wouldn't help because of the presence of the HTableDescriptor reference?
2017-08-27 12:44 GMT+02:00 Jörn Franke <jornfra...@gmail.com>: > It looks like that in your case everything should be serializable. An > alternative would be to mark certain non-serializable things as transient, > but as far as I see this is not possible in your case. > > On 27. Aug 2017, at 11:02, Federico D'Ambrosio < > federico.dambro...@smartlab.ws> wrote: > > Hi, > > I'm trying to write on HBase using writeOutputFormat using a custom HBase > format inspired from this example > <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java> > in flink-hbase (mind you, I'm using Scala instead of Java) and encountering > the error reported in the mail object. > > Now, the OutputFormat I'm using is the following: > > abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, > confPath : Path) extends OutputFormat[T]{ > > private val LOG = LoggerFactory.getLogger(this.getClass) > > var conf : org.apache.hadoop.conf.Configuration = _ > var connection : Connection = _ > var table : Table = _ > var taskNumber : String = _ > > @throws[IOException] > def configure(parameters: Configuration): Unit = { > conf = HBaseConfiguration.create() > conf.addResource(confPath.getPath) > connection = ConnectionFactory.createConnection(conf) > } > > > @throws[IOException] > def close(): Unit = { > table.close() > > } > > > @throws[IOException] > def open(taskNumber: Int, numTasks: Int): Unit = { > this.taskNumber = String.valueOf(taskNumber) > val admin = connection.getAdmin > > if(!admin.tableExists(tableDescriptor.getTableName)) > admin.createTable(tableDescriptor) > > table = connection.getTable(tableDescriptor.getTableName) > > } > } > > which is inherited by the actual format used, that implements the writeRecord > method > > > class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path) > extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath) > > with BatchContainer being > > case class BatchContainer(batch: Iterable[(String, String, String, Int)]) > extends Serializable > > I'd like to ask you: what needs to be Serializable? As far as I see, conf, > connection and table are not Serializable and so they are surely part of > the issue. Are the constructor parameters, especially tableDescriptor which > is not Serializable, to be considered in this case? Should all the methods > implemented from the OutputFormat interface contain only Serializable > variables? > > Thank you for you attention, > Federico > >