Thank you for the info! Is there a timetable for when the next version with this change might release?
On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <twal...@apache.org> wrote: > Hi Rex, > > sorry for the late reply. POJOs will have much better support in the > upcoming Flink versions because they have been fully integrated with the > new table type system mentioned in FLIP-37 [1] (e.g. support for > immutable POJOs and nested DataTypeHints etc). > > For queries, scalar, and table functions you can already use the full > POJOs within the table ecosystem. > > However, the only missing piece is the new translation of POJOs from > Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until > then I would recommend to either use `Row` as the output of the table > API or try to use a scalar function before that maps to the desired data > structure. > > I hope this helps a bit. > > Regards, > Timo > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > On 02.11.20 21:44, Rex Fenley wrote: > > My jobs normally use the blink planner, I noticed with this test that > > may not be the case. > > > > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com > > <mailto:r...@remind101.com>> wrote: > > > > Flink 1.11.2 with Scala 2.12 > > > > Error: > > [info] JobScalaTest: > > [info] - dummy *** FAILED *** > > [info] org.apache.flink.table.api.ValidationException: Field types > > of query result and registered TableSink do not match. > > [info] Query schema: [user: BIGINT, product: ROW<`name` > > VARCHAR(2147483647), `id` BIGINT>, amount: INT] > > [info] Sink schema: [user: BIGINT, product: > > LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, > > > > rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), > > amount: INT] > > [info] at > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103) > > [info] at > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260) > > [info] at > > > > org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) > > [info] at > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > > [info] at scala.collection.Iterator.foreach(Iterator.scala:943) > > [info] at scala.collection.Iterator.foreach$(Iterator.scala:943) > > [info] at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > > [info] at > scala.collection.IterableLike.foreach(IterableLike.scala:74) > > [info] at > > scala.collection.IterableLike.foreach$(IterableLike.scala:73) > > [info] at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) > > > > Code: > > import com.remind.graph.people.PeopleJobScala > > > > import org.scalatest.funsuite._ > > import org.scalatest.BeforeAndAfter > > > > import org.apache.flink.streaming.api.scala.{ > > DataStream, > > StreamExecutionEnvironment > > } > > import org.apache.flink.streaming.util.TestStreamEnvironment > > import org.apache.flink.table.runtime.util._ > > import org.apache.flink.test.util.AbstractTestBase > > import org.apache.flink.table.api._ > > import org.apache.flink.table.api.bridge.scala._ > > import org.apache.flink.streaming.api.scala._ > > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction > > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction > > import org.apache.flink.api.common.state.ListState > > import org.apache.flink.runtime.state.FunctionInitializationContext > > import org.apache.flink.api.common.state.ListStateDescriptor > > import org.apache.flink.runtime.state.FunctionSnapshotContext > > import org.apache.flink.types.Row > > > > import java.io.Serializable; > > import java.sql.Timestamp; > > import java.text.SimpleDateFormat > > import java.util.concurrent.atomic.AtomicInteger > > import java.{util => ju} > > > > import scala.collection.JavaConverters._ > > import scala.collection.mutable > > import scala.util.Try > > > > caseclassOrder(user: Long, product: ProductItem, amount: Int) { > > defthis() { > > this(0, null, 0) > > } > > > > overridedeftoString(): String = { > > return"Order{"+ > > "user="+ user + > > ", product='"+ product + '\''+ > > ", amount="+ amount + > > '}'; > > } > > } > > > > caseclassProductItem(name: String, id: Long) { > > defthis() { > > this(null, 0) > > } > > > > overridedeftoString(): String = { > > return"Product{"+ > > "name='"+ name + '\''+ > > ", id="+ id + > > '}'; > > } > > } > > > > classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{ > > varenv: StreamExecutionEnvironment = _ > > vartEnv: StreamTableEnvironment = _ > > > > before { > > this.env = StreamExecutionEnvironment.getExecutionEnvironment > > this.env.setParallelism(2) > > this.env.getConfig.enableObjectReuse() > > valsetting = > EnvironmentSettings.newInstance().inStreamingMode().build() > > this.tEnv = StreamTableEnvironment.create(env, setting) > > } > > > > after { > > StreamTestSink.clear() > > // TestValuesTableFactory.clearAllData() > > } > > > > defdateFrom(stringDate: String): java.sql.Date = { > > valdate = newSimpleDateFormat("dd/MM/yyyy") > > .parse(stringDate) > > returnnewjava.sql.Date(date.getTime()) > > } > > > > defprintTable(table: Table) = { > > println(table) > > table.printSchema() > > println(table.getSchema().getFieldNames().mkString(", ")) > > } > > > > defprintDataStream(dataStream: DataStream[_]) = { > > println(dataStream) > > println(dataStream.dataType) > > } > > > > test("dummy") { > > valorderA: DataStream[Order] = this.env.fromCollection( > > Seq( > > newOrder(1L, newProductItem("beer", 10L), 3), > > newOrder(1L, newProductItem("diaper", 11L), 4), > > newOrder(3L, newProductItem("rubber", 12L), 2) > > ) > > ) > > > > valorderB: DataStream[Order] = this.env.fromCollection( > > Seq( > > newOrder(2L, newProductItem("pen", 13L), 3), > > newOrder(2L, newProductItem("rubber", 12L), 3), > > newOrder(4L, newProductItem("beer", 10L), 1) > > ) > > ) > > > > println(orderB) > > println(orderB.dataType) > > > > // convert DataStream to Table > > valtableA = > > this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount) > > println(tableA) > > tableA.printSchema() > > println(tableA.getSchema().getFieldNames().mkString(", ")) > > // register DataStream as Table > > this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, > > 'amount) > > > > // union the two tables > > valresult = this.tEnv.sqlQuery(s""" > > |SELECT * FROM $tableAWHERE amount > 2 > > |UNION ALL > > |SELECT * FROM OrderB WHERE amount < 2 > > """.stripMargin) > > > > valsink = newStringSink[Order]() > > result.toAppendStream[Order].addSink(sink) > > > > this.env.execute() > > > > valexpected = List( > > "Order{user=1, product='Product{name='beer', id=10}', amount=3}", > > "Order{user=1, product='Product{name='diaper', id=11}', amount=4}", > > "Order{user=4, product='Product{name='beer', id=10}', amount=1}" > > ) > > valresults = sink.getResults.sorted > > println("results") > > println(results) > > assert(expected.sorted === results) > > } > > } > > > > /** > > * Taken from: > > > https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala > > * There's a whole bunch of other test sinks to choose from there. > > */ > > objectStreamTestSink { > > > > validCounter: AtomicInteger = newAtomicInteger(0) > > > > valglobalResults = > > mutable.HashMap.empty[Int, mutable.Map[Int, > > mutable.ArrayBuffer[String]]] > > valglobalRetractResults = > > mutable.HashMap.empty[Int, mutable.Map[Int, > > mutable.ArrayBuffer[String]]] > > valglobalUpsertResults = > > mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, > > String]]] > > > > defgetNewSinkId: Int = { > > validx = idCounter.getAndIncrement() > > this.synchronized{ > > globalResults.put( > > idx, > > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > > ) > > globalRetractResults.put( > > idx, > > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > > ) > > globalUpsertResults.put( > > idx, > > mutable.HashMap.empty[Int, mutable.Map[String, String]] > > ) > > } > > idx > > } > > > > defclear(): Unit = { > > globalResults.clear() > > globalRetractResults.clear() > > globalUpsertResults.clear() > > } > > } > > > > abstractclassAbstractExactlyOnceSink[T] > > extendsRichSinkFunction[T] > > withCheckpointedFunction{ > > protectedvarresultsState: ListState[String] = _ > > protectedvarlocalResults: mutable.ArrayBuffer[String] = _ > > protectedvalidx: Int = StreamTestSink.getNewSinkId > > > > protectedvarglobalResults: mutable.Map[Int, > > mutable.ArrayBuffer[String]] = _ > > protectedvarglobalRetractResults > > : mutable.Map[Int, mutable.ArrayBuffer[String]] = _ > > protectedvarglobalUpsertResults > > : mutable.Map[Int, mutable.Map[String, String]] = _ > > > > defisInitialized: Boolean = globalResults != null > > > > overridedefinitializeState(context: FunctionInitializationContext): > > Unit = { > > resultsState = context.getOperatorStateStore > > .getListState( > > newListStateDescriptor[String]("sink-results", Types.STRING) > > ) > > > > localResults = mutable.ArrayBuffer.empty[String] > > > > if(context.isRestored) { > > for(value <- resultsState.get().asScala) { > > localResults += value > > } > > } > > > > valtaskId = getRuntimeContext.getIndexOfThisSubtask > > StreamTestSink.synchronized( > > StreamTestSink.globalResults(idx) += (taskId -> localResults) > > ) > > } > > > > overridedefsnapshotState(context: FunctionSnapshotContext): Unit = { > > resultsState.clear() > > for(value <- localResults) { > > resultsState.add(value) > > } > > } > > > > protecteddefclearAndStashGlobalResults(): Unit = { > > if(globalResults == null) { > > StreamTestSink.synchronized{ > > globalResults = StreamTestSink.globalResults.remove(idx).get > > globalRetractResults = > > StreamTestSink.globalRetractResults.remove(idx).get > > globalUpsertResults = > StreamTestSink.globalUpsertResults.remove(idx).get > > } > > } > > } > > > > protecteddefgetResults: List[String] = { > > clearAndStashGlobalResults() > > valresult = mutable.ArrayBuffer.empty[String] > > this.globalResults.foreach { > > case(_, list) => result ++= list > > } > > result.toList > > } > > } > > > > finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() { > > overridedefinvoke(value: T) { > > localResults += value.toString > > } > > > > overridedefgetResults: List[String] = super.getResults > > } > > > > > > > > On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org > > <mailto:aljos...@apache.org>> wrote: > > > > @Timo: Is this sth that would work when using the new type > > stack? From > > the message I'm assuming it's using the older type stack. > > > > @Rex: Which Flink version are you using and could you maybe post > > the > > code snipped that you use to do conversions? > > > > Best, > > Aljoscha > > > > On 02.11.20 06:50, Rex Fenley wrote: > > > Maybe this is related to this issue? > > > https://issues.apache.org/jira/browse/FLINK-17683 > > > > > > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com > > <mailto:r...@remind101.com>> wrote: > > > > > >> Correction, I'm using Scala case classes not strictly Java > > POJOs just to > > >> be clear. > > >> > > >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley > > <r...@remind101.com <mailto:r...@remind101.com>> wrote: > > >> > > >>> Hello, > > >>> > > >>> I keep running into trouble moving between DataStream and > > SQL with POJOs > > >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', > > is there any > > >>> way to convert them back to POJOs in Flink when converting > > a SQL Table back > > >>> to a DataStream? > > >>> > > >>> Thanks! > > >>> > > >>> -- > > >>> > > >>> Rex Fenley | Software Engineer - Mobile and Backend > > >>> > > >>> > > >>> Remind.com <https://www.remind.com/> | BLOG > > <http://blog.remind.com/> > > >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US > > >>> <https://www.facebook.com/remindhq> > > >>> > > >> > > >> > > >> -- > > >> > > >> Rex Fenley | Software Engineer - Mobile and Backend > > >> > > >> > > >> Remind.com <https://www.remind.com/> | BLOG > > <http://blog.remind.com/> | > > >> FOLLOW US <https://twitter.com/remindhq> | LIKE US > > >> <https://www.facebook.com/remindhq> > > >> > > > > > > > > > > > > > > -- > > > > Rex Fenley|Software Engineer - Mobile and Backend > > > > > > Remind.com <https://www.remind.com/>| BLOG > > <http://blog.remind.com/> | FOLLOW US > > <https://twitter.com/remindhq> | LIKE US > > <https://www.facebook.com/remindhq> > > > > > > > > -- > > > > Rex Fenley|Software Engineer - Mobile and Backend > > > > > > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | > > FOLLOW US <https://twitter.com/remindhq> | LIKE US > > <https://www.facebook.com/remindhq> > > > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>