Hi Kevin, Currently, POJO type is not supported in Python DataStream API because it is hard to deal with the conversion between Python Objects and Java Objects. Maybe you can use a RowType to represent the POJO class such as Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(), Types.LONG()]). We will try to support the POJO type in the future.
Best, Shuiqiang Kevin Lam <kevin....@shopify.com> 于2021年3月15日周一 下午10:46写道: > Hi all, > > Looking to use Pyflink to work with some scala-defined objects being > emitted from a custom source. When trying to manipulate the objects in a > pyflink defined MapFunction > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>, > I'm hitting an error like: > > Caused by: java.lang.UnsupportedOperationException: The type information: > Option[<...>$Record(id: Long, created_at: Option[Long], updated_at: > Option[Long])] is not supported in PyFlink currently. > > The scala object is defined something like: > > ``` > object <...> { > case class Record( > id: Long, > created_at: Option[Long], > updated_at: Option[Long], > ... > ) > } > ``` > > The pyflink code is something like: > > ``` > class Mutate(MapFunction): > def map(self,value): > print(value.id) > value.id = 123 > > ... > > records = env.add_source(..) > records = records.map(Mutate() > ``` > > Can you provide any advice on how to work with these kinds of objects in > Pyflink? > > Thanks in advance! >