Thank you for the info!
Is there a timetable for when the next version with this change might
release?
On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Rex,
sorry for the late reply. POJOs will have much better support in the
upcoming Flink versions because they have been fully integrated with
the
new table type system mentioned in FLIP-37 [1] (e.g. support for
immutable POJOs and nested DataTypeHints etc).
For queries, scalar, and table functions you can already use the full
POJOs within the table ecosystem.
However, the only missing piece is the new translation of POJOs from
Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
then I would recommend to either use `Row` as the output of the table
API or try to use a scalar function before that maps to the desired
data
structure.
I hope this helps a bit.
Regards,
Timo
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
On 02.11.20 21:44, Rex Fenley wrote:
> My jobs normally use the blink planner, I noticed with this test
that
> may not be the case.
>
> On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com
<mailto:r...@remind101.com>
> <mailto:r...@remind101.com <mailto:r...@remind101.com>>> wrote:
>
> Flink 1.11.2 with Scala 2.12
>
> Error:
> [info] JobScalaTest:
> [info] - dummy *** FAILED ***
> [info] org.apache.flink.table.api.ValidationException:
Field types
> of query result and registered TableSink do not match.
> [info] Query schema: [user: BIGINT, product: ROW<`name`
> VARCHAR(2147483647), `id` BIGINT>, amount: INT]
> [info] Sink schema: [user: BIGINT, product:
> LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
>
rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
> amount: INT]
> [info] at
>
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
> [info] at
>
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
> [info] at
>
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> [info] at
>
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
> [info] at scala.collection.Iterator.foreach(Iterator.scala:943)
> [info] at
scala.collection.Iterator.foreach$(Iterator.scala:943)
> [info] at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> [info] at
scala.collection.IterableLike.foreach(IterableLike.scala:74)
> [info] at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> [info] at
scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
> Code:
> import com.remind.graph.people.PeopleJobScala
>
> import org.scalatest.funsuite._
> import org.scalatest.BeforeAndAfter
>
> import org.apache.flink.streaming.api.scala.{
> DataStream,
> StreamExecutionEnvironment
> }
> import org.apache.flink.streaming.util.TestStreamEnvironment
> import org.apache.flink.table.runtime.util._
> import org.apache.flink.test.util.AbstractTestBase
> import org.apache.flink.table.api._
> import org.apache.flink.table.api.bridge.scala._
> import org.apache.flink.streaming.api.scala._
> import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
> import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
> import org.apache.flink.api.common.state.ListState
> import
org.apache.flink.runtime.state.FunctionInitializationContext
> import org.apache.flink.api.common.state.ListStateDescriptor
> import org.apache.flink.runtime.state.FunctionSnapshotContext
> import org.apache.flink.types.Row
>
> import java.io.Serializable;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat
> import java.util.concurrent.atomic.AtomicInteger
> import java.{util => ju}
>
> import scala.collection.JavaConverters._
> import scala.collection.mutable
> import scala.util.Try
>
> caseclassOrder(user: Long, product: ProductItem, amount: Int) {
> defthis() {
> this(0, null, 0)
> }
>
> overridedeftoString(): String = {
> return"Order{"+
> "user="+ user +
> ", product='"+ product + '\''+
> ", amount="+ amount +
> '}';
> }
> }
>
> caseclassProductItem(name: String, id: Long) {
> defthis() {
> this(null, 0)
> }
>
> overridedeftoString(): String = {
> return"Product{"+
> "name='"+ name + '\''+
> ", id="+ id +
> '}';
> }
> }
>
> classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
> varenv: StreamExecutionEnvironment = _
> vartEnv: StreamTableEnvironment = _
>
> before {
> this.env = StreamExecutionEnvironment.getExecutionEnvironment
> this.env.setParallelism(2)
> this.env.getConfig.enableObjectReuse()
> valsetting =
EnvironmentSettings.newInstance().inStreamingMode().build()
> this.tEnv = StreamTableEnvironment.create(env, setting)
> }
>
> after {
> StreamTestSink.clear()
> // TestValuesTableFactory.clearAllData()
> }
>
> defdateFrom(stringDate: String): java.sql.Date = {
> valdate = newSimpleDateFormat("dd/MM/yyyy")
> .parse(stringDate)
> returnnewjava.sql.Date(date.getTime())
> }
>
> defprintTable(table: Table) = {
> println(table)
> table.printSchema()
> println(table.getSchema().getFieldNames().mkString(", "))
> }
>
> defprintDataStream(dataStream: DataStream[_]) = {
> println(dataStream)
> println(dataStream.dataType)
> }
>
> test("dummy") {
> valorderA: DataStream[Order] = this.env.fromCollection(
> Seq(
> newOrder(1L, newProductItem("beer", 10L), 3),
> newOrder(1L, newProductItem("diaper", 11L), 4),
> newOrder(3L, newProductItem("rubber", 12L), 2)
> )
> )
>
> valorderB: DataStream[Order] = this.env.fromCollection(
> Seq(
> newOrder(2L, newProductItem("pen", 13L), 3),
> newOrder(2L, newProductItem("rubber", 12L), 3),
> newOrder(4L, newProductItem("beer", 10L), 1)
> )
> )
>
> println(orderB)
> println(orderB.dataType)
>
> // convert DataStream to Table
> valtableA =
> this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
> println(tableA)
> tableA.printSchema()
> println(tableA.getSchema().getFieldNames().mkString(", "))
> // register DataStream as Table
> this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
> 'amount)
>
> // union the two tables
> valresult = this.tEnv.sqlQuery(s"""
> |SELECT * FROM $tableAWHERE amount > 2
> |UNION ALL
> |SELECT * FROM OrderB WHERE amount < 2
> """.stripMargin)
>
> valsink = newStringSink[Order]()
> result.toAppendStream[Order].addSink(sink)
>
> this.env.execute()
>
> valexpected = List(
> "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
> "Order{user=1, product='Product{name='diaper', id=11}',
amount=4}",
> "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
> )
> valresults = sink.getResults.sorted
> println("results")
> println(results)
> assert(expected.sorted === results)
> }
> }
>
> /**
> * Taken from:
>
https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
> * There's a whole bunch of other test sinks to choose from there.
> */
> objectStreamTestSink {
>
> validCounter: AtomicInteger = newAtomicInteger(0)
>
> valglobalResults =
> mutable.HashMap.empty[Int, mutable.Map[Int,
> mutable.ArrayBuffer[String]]]
> valglobalRetractResults =
> mutable.HashMap.empty[Int, mutable.Map[Int,
> mutable.ArrayBuffer[String]]]
> valglobalUpsertResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
> String]]]
>
> defgetNewSinkId: Int = {
> validx = idCounter.getAndIncrement()
> this.synchronized{
> globalResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> )
> globalRetractResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> )
> globalUpsertResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.Map[String, String]]
> )
> }
> idx
> }
>
> defclear(): Unit = {
> globalResults.clear()
> globalRetractResults.clear()
> globalUpsertResults.clear()
> }
> }
>
> abstractclassAbstractExactlyOnceSink[T]
> extendsRichSinkFunction[T]
> withCheckpointedFunction{
> protectedvarresultsState: ListState[String] = _
> protectedvarlocalResults: mutable.ArrayBuffer[String] = _
> protectedvalidx: Int = StreamTestSink.getNewSinkId
>
> protectedvarglobalResults: mutable.Map[Int,
> mutable.ArrayBuffer[String]] = _
> protectedvarglobalRetractResults
> : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
> protectedvarglobalUpsertResults
> : mutable.Map[Int, mutable.Map[String, String]] = _
>
> defisInitialized: Boolean = globalResults != null
>
> overridedefinitializeState(context:
FunctionInitializationContext):
> Unit = {
> resultsState = context.getOperatorStateStore
> .getListState(
> newListStateDescriptor[String]("sink-results", Types.STRING)
> )
>
> localResults = mutable.ArrayBuffer.empty[String]
>
> if(context.isRestored) {
> for(value <- resultsState.get().asScala) {
> localResults += value
> }
> }
>
> valtaskId = getRuntimeContext.getIndexOfThisSubtask
> StreamTestSink.synchronized(
> StreamTestSink.globalResults(idx) += (taskId -> localResults)
> )
> }
>
> overridedefsnapshotState(context: FunctionSnapshotContext):
Unit = {
> resultsState.clear()
> for(value <- localResults) {
> resultsState.add(value)
> }
> }
>
> protecteddefclearAndStashGlobalResults(): Unit = {
> if(globalResults == null) {
> StreamTestSink.synchronized{
> globalResults = StreamTestSink.globalResults.remove(idx).get
> globalRetractResults =
> StreamTestSink.globalRetractResults.remove(idx).get
> globalUpsertResults =
StreamTestSink.globalUpsertResults.remove(idx).get
> }
> }
> }
>
> protecteddefgetResults: List[String] = {
> clearAndStashGlobalResults()
> valresult = mutable.ArrayBuffer.empty[String]
> this.globalResults.foreach {
> case(_, list) => result ++= list
> }
> result.toList
> }
> }
>
> finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
> overridedefinvoke(value: T) {
> localResults += value.toString
> }
>
> overridedefgetResults: List[String] = super.getResults
> }
>
>
>
> On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek
<aljos...@apache.org <mailto:aljos...@apache.org>
> <mailto:aljos...@apache.org <mailto:aljos...@apache.org>>> wrote:
>
> @Timo: Is this sth that would work when using the new type
> stack? From
> the message I'm assuming it's using the older type stack.
>
> @Rex: Which Flink version are you using and could you
maybe post
> the
> code snipped that you use to do conversions?
>
> Best,
> Aljoscha
>
> On 02.11.20 06:50, Rex Fenley wrote:
> > Maybe this is related to this issue?
> > https://issues.apache.org/jira/browse/FLINK-17683
> >
> > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley
<r...@remind101.com <mailto:r...@remind101.com>
> <mailto:r...@remind101.com <mailto:r...@remind101.com>>> wrote:
> >
> >> Correction, I'm using Scala case classes not strictly
Java
> POJOs just to
> >> be clear.
> >>
> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
> <r...@remind101.com <mailto:r...@remind101.com>
<mailto:r...@remind101.com <mailto:r...@remind101.com>>> wrote:
> >>
> >>> Hello,
> >>>
> >>> I keep running into trouble moving between
DataStream and
> SQL with POJOs
> >>> because my nested POJOs turn into
LEGACY('STRUCTURED_TYPE',
> is there any
> >>> way to convert them back to POJOs in Flink when
converting
> a SQL Table back
> >>> to a DataStream?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>>
> >>> Rex Fenley | Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com <https://www.remind.com/> | BLOG
> <http://blog.remind.com/>
> >>> | FOLLOW US <https://twitter.com/remindhq> |
LIKE US
> >>> <https://www.facebook.com/remindhq>
> >>>
> >>
> >>
> >> --
> >>
> >> Rex Fenley | Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com <https://www.remind.com/> | BLOG
> <http://blog.remind.com/> |
> >> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> >> <https://www.facebook.com/remindhq>
> >>
> >
> >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG
> <http://blog.remind.com/> | FOLLOW US
> <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG
<http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>
--
Rex Fenley|Software Engineer - Mobile and Backend
Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
FOLLOW US <https://twitter.com/remindhq> | LIKE US
<https://www.facebook.com/remindhq>