Thank you for the info!

Is there a timetable for when the next version with this change might
release?

On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <twal...@apache.org> wrote:

> Hi Rex,
>
> sorry for the late reply. POJOs will have much better support in the
> upcoming Flink versions because they have been fully integrated with the
> new table type system mentioned in FLIP-37 [1] (e.g. support for
> immutable POJOs and nested DataTypeHints etc).
>
> For queries, scalar, and table functions you can already use the full
> POJOs within the table ecosystem.
>
> However, the only missing piece is the new translation of POJOs from
> Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
> then I would recommend to either use `Row` as the output of the table
> API or try to use a scalar function before that maps to the desired data
> structure.
>
> I hope this helps a bit.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
> On 02.11.20 21:44, Rex Fenley wrote:
> > My jobs normally use the blink planner, I noticed with this test that
> > may not be the case.
> >
> > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com
> > <mailto:r...@remind101.com>> wrote:
> >
> >     Flink 1.11.2 with Scala 2.12
> >
> >     Error:
> >     [info] JobScalaTest:
> >     [info] - dummy *** FAILED ***
> >     [info]   org.apache.flink.table.api.ValidationException: Field types
> >     of query result and registered TableSink  do not match.
> >     [info] Query schema: [user: BIGINT, product: ROW<`name`
> >     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
> >     [info] Sink schema: [user: BIGINT, product:
> >     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
> >
>  
> rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
> >     amount: INT]
> >     [info]   at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
> >     [info]   at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
> >     [info]   at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> >     [info]   at
> >
>  scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
> >     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
> >     [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
> >     [info]   at
> >     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> >     [info]   at
> scala.collection.IterableLike.foreach(IterableLike.scala:74)
> >     [info]   at
> >     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> >     [info]   at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> >
> >     Code:
> >     import com.remind.graph.people.PeopleJobScala
> >
> >     import org.scalatest.funsuite._
> >     import org.scalatest.BeforeAndAfter
> >
> >     import org.apache.flink.streaming.api.scala.{
> >     DataStream,
> >     StreamExecutionEnvironment
> >     }
> >     import org.apache.flink.streaming.util.TestStreamEnvironment
> >     import org.apache.flink.table.runtime.util._
> >     import org.apache.flink.test.util.AbstractTestBase
> >     import org.apache.flink.table.api._
> >     import org.apache.flink.table.api.bridge.scala._
> >     import org.apache.flink.streaming.api.scala._
> >     import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
> >     import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
> >     import org.apache.flink.api.common.state.ListState
> >     import org.apache.flink.runtime.state.FunctionInitializationContext
> >     import org.apache.flink.api.common.state.ListStateDescriptor
> >     import org.apache.flink.runtime.state.FunctionSnapshotContext
> >     import org.apache.flink.types.Row
> >
> >     import java.io.Serializable;
> >     import java.sql.Timestamp;
> >     import java.text.SimpleDateFormat
> >     import java.util.concurrent.atomic.AtomicInteger
> >     import java.{util => ju}
> >
> >     import scala.collection.JavaConverters._
> >     import scala.collection.mutable
> >     import scala.util.Try
> >
> >     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
> >     defthis() {
> >     this(0, null, 0)
> >     }
> >
> >     overridedeftoString(): String = {
> >     return"Order{"+
> >     "user="+ user +
> >     ", product='"+ product + '\''+
> >     ", amount="+ amount +
> >     '}';
> >     }
> >     }
> >
> >     caseclassProductItem(name: String, id: Long) {
> >     defthis() {
> >     this(null, 0)
> >     }
> >
> >     overridedeftoString(): String = {
> >     return"Product{"+
> >     "name='"+ name + '\''+
> >     ", id="+ id +
> >     '}';
> >     }
> >     }
> >
> >     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
> >     varenv: StreamExecutionEnvironment = _
> >     vartEnv: StreamTableEnvironment = _
> >
> >     before {
> >     this.env = StreamExecutionEnvironment.getExecutionEnvironment
> >     this.env.setParallelism(2)
> >     this.env.getConfig.enableObjectReuse()
> >     valsetting =
> EnvironmentSettings.newInstance().inStreamingMode().build()
> >     this.tEnv = StreamTableEnvironment.create(env, setting)
> >     }
> >
> >     after {
> >     StreamTestSink.clear()
> >     // TestValuesTableFactory.clearAllData()
> >     }
> >
> >     defdateFrom(stringDate: String): java.sql.Date = {
> >     valdate = newSimpleDateFormat("dd/MM/yyyy")
> >     .parse(stringDate)
> >     returnnewjava.sql.Date(date.getTime())
> >     }
> >
> >     defprintTable(table: Table) = {
> >     println(table)
> >     table.printSchema()
> >     println(table.getSchema().getFieldNames().mkString(", "))
> >     }
> >
> >     defprintDataStream(dataStream: DataStream[_]) = {
> >     println(dataStream)
> >     println(dataStream.dataType)
> >     }
> >
> >     test("dummy") {
> >     valorderA: DataStream[Order] = this.env.fromCollection(
> >     Seq(
> >     newOrder(1L, newProductItem("beer", 10L), 3),
> >     newOrder(1L, newProductItem("diaper", 11L), 4),
> >     newOrder(3L, newProductItem("rubber", 12L), 2)
> >     )
> >     )
> >
> >     valorderB: DataStream[Order] = this.env.fromCollection(
> >     Seq(
> >     newOrder(2L, newProductItem("pen", 13L), 3),
> >     newOrder(2L, newProductItem("rubber", 12L), 3),
> >     newOrder(4L, newProductItem("beer", 10L), 1)
> >     )
> >     )
> >
> >     println(orderB)
> >     println(orderB.dataType)
> >
> >     // convert DataStream to Table
> >     valtableA =
> >     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
> >     println(tableA)
> >     tableA.printSchema()
> >     println(tableA.getSchema().getFieldNames().mkString(", "))
> >     // register DataStream as Table
> >     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
> >     'amount)
> >
> >     // union the two tables
> >     valresult = this.tEnv.sqlQuery(s"""
> >     |SELECT * FROM $tableAWHERE amount > 2
> >     |UNION ALL
> >     |SELECT * FROM OrderB WHERE amount < 2
> >     """.stripMargin)
> >
> >     valsink = newStringSink[Order]()
> >     result.toAppendStream[Order].addSink(sink)
> >
> >     this.env.execute()
> >
> >     valexpected = List(
> >     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
> >     "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
> >     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
> >     )
> >     valresults = sink.getResults.sorted
> >     println("results")
> >     println(results)
> >     assert(expected.sorted === results)
> >     }
> >     }
> >
> >     /**
> >     * Taken from:
> >
> https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
> >     * There's a whole bunch of other test sinks to choose from there.
> >     */
> >     objectStreamTestSink {
> >
> >     validCounter: AtomicInteger = newAtomicInteger(0)
> >
> >     valglobalResults =
> >     mutable.HashMap.empty[Int, mutable.Map[Int,
> >     mutable.ArrayBuffer[String]]]
> >     valglobalRetractResults =
> >     mutable.HashMap.empty[Int, mutable.Map[Int,
> >     mutable.ArrayBuffer[String]]]
> >     valglobalUpsertResults =
> >     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
> >     String]]]
> >
> >     defgetNewSinkId: Int = {
> >     validx = idCounter.getAndIncrement()
> >     this.synchronized{
> >     globalResults.put(
> >     idx,
> >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> >     )
> >     globalRetractResults.put(
> >     idx,
> >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> >     )
> >     globalUpsertResults.put(
> >     idx,
> >     mutable.HashMap.empty[Int, mutable.Map[String, String]]
> >     )
> >     }
> >     idx
> >     }
> >
> >     defclear(): Unit = {
> >     globalResults.clear()
> >     globalRetractResults.clear()
> >     globalUpsertResults.clear()
> >     }
> >     }
> >
> >     abstractclassAbstractExactlyOnceSink[T]
> >     extendsRichSinkFunction[T]
> >     withCheckpointedFunction{
> >     protectedvarresultsState: ListState[String] = _
> >     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
> >     protectedvalidx: Int = StreamTestSink.getNewSinkId
> >
> >     protectedvarglobalResults: mutable.Map[Int,
> >     mutable.ArrayBuffer[String]] = _
> >     protectedvarglobalRetractResults
> >     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
> >     protectedvarglobalUpsertResults
> >     : mutable.Map[Int, mutable.Map[String, String]] = _
> >
> >     defisInitialized: Boolean = globalResults != null
> >
> >     overridedefinitializeState(context: FunctionInitializationContext):
> >     Unit = {
> >     resultsState = context.getOperatorStateStore
> >     .getListState(
> >     newListStateDescriptor[String]("sink-results", Types.STRING)
> >     )
> >
> >     localResults = mutable.ArrayBuffer.empty[String]
> >
> >     if(context.isRestored) {
> >     for(value <- resultsState.get().asScala) {
> >     localResults += value
> >     }
> >     }
> >
> >     valtaskId = getRuntimeContext.getIndexOfThisSubtask
> >     StreamTestSink.synchronized(
> >     StreamTestSink.globalResults(idx) += (taskId -> localResults)
> >     )
> >     }
> >
> >     overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
> >     resultsState.clear()
> >     for(value <- localResults) {
> >     resultsState.add(value)
> >     }
> >     }
> >
> >     protecteddefclearAndStashGlobalResults(): Unit = {
> >     if(globalResults == null) {
> >     StreamTestSink.synchronized{
> >     globalResults = StreamTestSink.globalResults.remove(idx).get
> >     globalRetractResults =
> >     StreamTestSink.globalRetractResults.remove(idx).get
> >     globalUpsertResults =
> StreamTestSink.globalUpsertResults.remove(idx).get
> >     }
> >     }
> >     }
> >
> >     protecteddefgetResults: List[String] = {
> >     clearAndStashGlobalResults()
> >     valresult = mutable.ArrayBuffer.empty[String]
> >     this.globalResults.foreach {
> >     case(_, list) => result ++= list
> >     }
> >     result.toList
> >     }
> >     }
> >
> >     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
> >     overridedefinvoke(value: T) {
> >     localResults += value.toString
> >     }
> >
> >     overridedefgetResults: List[String] = super.getResults
> >     }
> >
> >
> >
> >     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org
> >     <mailto:aljos...@apache.org>> wrote:
> >
> >         @Timo: Is this sth that would work when using the new type
> >         stack? From
> >         the message I'm assuming it's using the older type stack.
> >
> >         @Rex: Which Flink version are you using and could you maybe post
> >         the
> >         code snipped that you use to do conversions?
> >
> >         Best,
> >         Aljoscha
> >
> >         On 02.11.20 06:50, Rex Fenley wrote:
> >          > Maybe this is related to this issue?
> >          > https://issues.apache.org/jira/browse/FLINK-17683
> >          >
> >          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com
> >         <mailto:r...@remind101.com>> wrote:
> >          >
> >          >> Correction, I'm using Scala case classes not strictly Java
> >         POJOs just to
> >          >> be clear.
> >          >>
> >          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
> >         <r...@remind101.com <mailto:r...@remind101.com>> wrote:
> >          >>
> >          >>> Hello,
> >          >>>
> >          >>> I keep running into trouble moving between DataStream and
> >         SQL with POJOs
> >          >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
> >         is there any
> >          >>> way to convert them back to POJOs in Flink when converting
> >         a SQL Table back
> >          >>> to a DataStream?
> >          >>>
> >          >>> Thanks!
> >          >>>
> >          >>> --
> >          >>>
> >          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >          >>>
> >          >>>
> >          >>> Remind.com <https://www.remind.com/> |  BLOG
> >         <http://blog.remind.com/>
> >          >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >          >>> <https://www.facebook.com/remindhq>
> >          >>>
> >          >>
> >          >>
> >          >> --
> >          >>
> >          >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >          >>
> >          >>
> >          >> Remind.com <https://www.remind.com/> |  BLOG
> >         <http://blog.remind.com/>  |
> >          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >          >> <https://www.facebook.com/remindhq>
> >          >>
> >          >
> >          >
> >
> >
> >
> >     --
> >
> >     Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> >     Remind.com <https://www.remind.com/>| BLOG
> >     <http://blog.remind.com/> | FOLLOW US
> >     <https://twitter.com/remindhq> | LIKE US
> >     <https://www.facebook.com/remindhq>
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to