Hi Rex,

sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrated with the new table type system mentioned in FLIP-37 [1] (e.g. support for immutable POJOs and nested DataTypeHints etc).

For queries, scalar, and table functions you can already use the full POJOs within the table ecosystem.

However, the only missing piece is the new translation of POJOs from Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until then I would recommend to either use `Row` as the output of the table API or try to use a scalar function before that maps to the desired data structure.

I hope this helps a bit.

Regards,
Timo

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 02.11.20 21:44, Rex Fenley wrote:
My jobs normally use the blink planner, I noticed with this test that may not be the case.

On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com <mailto:r...@remind101.com>> wrote:

    Flink 1.11.2 with Scala 2.12

    Error:
    [info] JobScalaTest:
    [info] - dummy *** FAILED ***
    [info]   org.apache.flink.table.api.ValidationException: Field types
    of query result and registered TableSink  do not match.
    [info] Query schema: [user: BIGINT, product: ROW<`name`
    VARCHAR(2147483647), `id` BIGINT>, amount: INT]
    [info] Sink schema: [user: BIGINT, product:
    LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
    
rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
    amount: INT]
    [info]   at
    
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
    [info]   at
    
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
    [info]   at
    
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
    [info]   at
    scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
    [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
    [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
    [info]   at
    scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    [info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    [info]   at
    scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    [info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

    Code:
    import com.remind.graph.people.PeopleJobScala

    import org.scalatest.funsuite._
    import org.scalatest.BeforeAndAfter

    import org.apache.flink.streaming.api.scala.{
    DataStream,
    StreamExecutionEnvironment
    }
    import org.apache.flink.streaming.util.TestStreamEnvironment
    import org.apache.flink.table.runtime.util._
    import org.apache.flink.test.util.AbstractTestBase
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    import org.apache.flink.api.common.state.ListState
    import org.apache.flink.runtime.state.FunctionInitializationContext
    import org.apache.flink.api.common.state.ListStateDescriptor
    import org.apache.flink.runtime.state.FunctionSnapshotContext
    import org.apache.flink.types.Row

    import java.io.Serializable;
    import java.sql.Timestamp;
    import java.text.SimpleDateFormat
    import java.util.concurrent.atomic.AtomicInteger
    import java.{util => ju}

    import scala.collection.JavaConverters._
    import scala.collection.mutable
    import scala.util.Try

    caseclassOrder(user: Long, product: ProductItem, amount: Int) {
    defthis() {
    this(0, null, 0)
    }

    overridedeftoString(): String = {
    return"Order{"+
    "user="+ user +
    ", product='"+ product + '\''+
    ", amount="+ amount +
    '}';
    }
    }

    caseclassProductItem(name: String, id: Long) {
    defthis() {
    this(null, 0)
    }

    overridedeftoString(): String = {
    return"Product{"+
    "name='"+ name + '\''+
    ", id="+ id +
    '}';
    }
    }

    classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
    varenv: StreamExecutionEnvironment = _
    vartEnv: StreamTableEnvironment = _

    before {
    this.env = StreamExecutionEnvironment.getExecutionEnvironment
    this.env.setParallelism(2)
    this.env.getConfig.enableObjectReuse()
    valsetting = EnvironmentSettings.newInstance().inStreamingMode().build()
    this.tEnv = StreamTableEnvironment.create(env, setting)
    }

    after {
    StreamTestSink.clear()
    // TestValuesTableFactory.clearAllData()
    }

    defdateFrom(stringDate: String): java.sql.Date = {
    valdate = newSimpleDateFormat("dd/MM/yyyy")
    .parse(stringDate)
    returnnewjava.sql.Date(date.getTime())
    }

    defprintTable(table: Table) = {
    println(table)
    table.printSchema()
    println(table.getSchema().getFieldNames().mkString(", "))
    }

    defprintDataStream(dataStream: DataStream[_]) = {
    println(dataStream)
    println(dataStream.dataType)
    }

    test("dummy") {
    valorderA: DataStream[Order] = this.env.fromCollection(
    Seq(
    newOrder(1L, newProductItem("beer", 10L), 3),
    newOrder(1L, newProductItem("diaper", 11L), 4),
    newOrder(3L, newProductItem("rubber", 12L), 2)
    )
    )

    valorderB: DataStream[Order] = this.env.fromCollection(
    Seq(
    newOrder(2L, newProductItem("pen", 13L), 3),
    newOrder(2L, newProductItem("rubber", 12L), 3),
    newOrder(4L, newProductItem("beer", 10L), 1)
    )
    )

    println(orderB)
    println(orderB.dataType)

    // convert DataStream to Table
    valtableA =
    this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
    println(tableA)
    tableA.printSchema()
    println(tableA.getSchema().getFieldNames().mkString(", "))
    // register DataStream as Table
    this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
    'amount)

    // union the two tables
    valresult = this.tEnv.sqlQuery(s"""
    |SELECT * FROM $tableAWHERE amount > 2
    |UNION ALL
    |SELECT * FROM OrderB WHERE amount < 2
    """.stripMargin)

    valsink = newStringSink[Order]()
    result.toAppendStream[Order].addSink(sink)

    this.env.execute()

    valexpected = List(
    "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
    "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
    "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
    )
    valresults = sink.getResults.sorted
    println("results")
    println(results)
    assert(expected.sorted === results)
    }
    }

    /**
    * Taken from:
    
https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
    * There's a whole bunch of other test sinks to choose from there.
    */
    objectStreamTestSink {

    validCounter: AtomicInteger = newAtomicInteger(0)

    valglobalResults =
    mutable.HashMap.empty[Int, mutable.Map[Int,
    mutable.ArrayBuffer[String]]]
    valglobalRetractResults =
    mutable.HashMap.empty[Int, mutable.Map[Int,
    mutable.ArrayBuffer[String]]]
    valglobalUpsertResults =
    mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
    String]]]

    defgetNewSinkId: Int = {
    validx = idCounter.getAndIncrement()
    this.synchronized{
    globalResults.put(
    idx,
    mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
    )
    globalRetractResults.put(
    idx,
    mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
    )
    globalUpsertResults.put(
    idx,
    mutable.HashMap.empty[Int, mutable.Map[String, String]]
    )
    }
    idx
    }

    defclear(): Unit = {
    globalResults.clear()
    globalRetractResults.clear()
    globalUpsertResults.clear()
    }
    }

    abstractclassAbstractExactlyOnceSink[T]
    extendsRichSinkFunction[T]
    withCheckpointedFunction{
    protectedvarresultsState: ListState[String] = _
    protectedvarlocalResults: mutable.ArrayBuffer[String] = _
    protectedvalidx: Int = StreamTestSink.getNewSinkId

    protectedvarglobalResults: mutable.Map[Int,
    mutable.ArrayBuffer[String]] = _
    protectedvarglobalRetractResults
    : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
    protectedvarglobalUpsertResults
    : mutable.Map[Int, mutable.Map[String, String]] = _

    defisInitialized: Boolean = globalResults != null

    overridedefinitializeState(context: FunctionInitializationContext):
    Unit = {
    resultsState = context.getOperatorStateStore
    .getListState(
    newListStateDescriptor[String]("sink-results", Types.STRING)
    )

    localResults = mutable.ArrayBuffer.empty[String]

    if(context.isRestored) {
    for(value <- resultsState.get().asScala) {
    localResults += value
    }
    }

    valtaskId = getRuntimeContext.getIndexOfThisSubtask
    StreamTestSink.synchronized(
    StreamTestSink.globalResults(idx) += (taskId -> localResults)
    )
    }

    overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
    resultsState.clear()
    for(value <- localResults) {
    resultsState.add(value)
    }
    }

    protecteddefclearAndStashGlobalResults(): Unit = {
    if(globalResults == null) {
    StreamTestSink.synchronized{
    globalResults = StreamTestSink.globalResults.remove(idx).get
    globalRetractResults =
    StreamTestSink.globalRetractResults.remove(idx).get
    globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
    }
    }
    }

    protecteddefgetResults: List[String] = {
    clearAndStashGlobalResults()
    valresult = mutable.ArrayBuffer.empty[String]
    this.globalResults.foreach {
    case(_, list) => result ++= list
    }
    result.toList
    }
    }

    finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
    overridedefinvoke(value: T) {
    localResults += value.toString
    }

    overridedefgetResults: List[String] = super.getResults
    }



    On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org
    <mailto:aljos...@apache.org>> wrote:

        @Timo: Is this sth that would work when using the new type
        stack? From
        the message I'm assuming it's using the older type stack.

        @Rex: Which Flink version are you using and could you maybe post
        the
        code snipped that you use to do conversions?

        Best,
        Aljoscha

        On 02.11.20 06:50, Rex Fenley wrote:
         > Maybe this is related to this issue?
         > https://issues.apache.org/jira/browse/FLINK-17683
         >
         > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com
        <mailto:r...@remind101.com>> wrote:
         >
         >> Correction, I'm using Scala case classes not strictly Java
        POJOs just to
         >> be clear.
         >>
         >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
        <r...@remind101.com <mailto:r...@remind101.com>> wrote:
         >>
         >>> Hello,
         >>>
         >>> I keep running into trouble moving between DataStream and
        SQL with POJOs
         >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
        is there any
         >>> way to convert them back to POJOs in Flink when converting
        a SQL Table back
         >>> to a DataStream?
         >>>
         >>> Thanks!
         >>>
         >>> --
         >>>
         >>> Rex Fenley  |  Software Engineer - Mobile and Backend
         >>>
         >>>
         >>> Remind.com <https://www.remind.com/> |  BLOG
        <http://blog.remind.com/>
         >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
         >>> <https://www.facebook.com/remindhq>
         >>>
         >>
         >>
         >> --
         >>
         >> Rex Fenley  |  Software Engineer - Mobile and Backend
         >>
         >>
         >> Remind.com <https://www.remind.com/> |  BLOG
        <http://blog.remind.com/>  |
         >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
         >> <https://www.facebook.com/remindhq>
         >>
         >
         >



--
    Rex Fenley|Software Engineer - Mobile and Backend


    Remind.com <https://www.remind.com/>| BLOG
    <http://blog.remind.com/> | FOLLOW US
    <https://twitter.com/remindhq> | LIKE US
    <https://www.facebook.com/remindhq>



--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>


Reply via email to