Hi,

could you elaborate, please? Marking conf, connection and table as
transient wouldn't help because of the presence of the HTableDescriptor
reference?

2017-08-27 12:44 GMT+02:00 Jörn Franke <jornfra...@gmail.com>:

> It looks like that in your case everything should be serializable. An
> alternative would be to mark certain non-serializable things as transient,
> but as far as I see this is not possible in your case.
>
> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi,
>
> I'm trying to write on HBase using writeOutputFormat using a custom HBase
> format inspired from this example
> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
> the error reported in the mail object.
>
> Now, the OutputFormat I'm using is the following:
>
> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, 
> confPath : Path) extends OutputFormat[T]{
>
>   private val LOG = LoggerFactory.getLogger(this.getClass)
>
>   var conf : org.apache.hadoop.conf.Configuration = _
>   var connection : Connection = _
>   var table : Table = _
>   var taskNumber : String = _
>
>   @throws[IOException]
>   def configure(parameters: Configuration): Unit = {
>     conf = HBaseConfiguration.create()
>     conf.addResource(confPath.getPath)
>     connection = ConnectionFactory.createConnection(conf)
>   }
>
>
>   @throws[IOException]
>   def close(): Unit = {
>     table.close()
>
>   }
>
>
>   @throws[IOException]
>   def open(taskNumber: Int, numTasks: Int): Unit = {
>     this.taskNumber = String.valueOf(taskNumber)
>     val admin = connection.getAdmin
>
>     if(!admin.tableExists(tableDescriptor.getTableName))
>       admin.createTable(tableDescriptor)
>
>     table = connection.getTable(tableDescriptor.getTableName)
>
>   }
> }
>
> which is inherited by the actual format used, that implements the writeRecord 
> method
>
>
> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>
> with BatchContainer being
>
> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) 
> extends Serializable
>
> I'd like to ask you: what needs to be Serializable? As far as I see, conf,
> connection and table are not Serializable and so they are surely part of
> the issue. Are the constructor parameters, especially tableDescriptor which
> is not Serializable, to be considered in this case? Should all the methods
> implemented from the OutputFormat interface contain only Serializable
> variables?
>
> Thank you for you attention,
> Federico
>
>

Reply via email to