My jobs normally use the blink planner, I noticed with this test that
may not be the case.
On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com
<mailto:r...@remind101.com>> wrote:
Flink 1.11.2 with Scala 2.12
Error:
[info] JobScalaTest:
[info] - dummy *** FAILED ***
[info] org.apache.flink.table.api.ValidationException: Field types
of query result and registered TableSink do not match.
[info] Query schema: [user: BIGINT, product: ROW<`name`
VARCHAR(2147483647), `id` BIGINT>, amount: INT]
[info] Sink schema: [user: BIGINT, product:
LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
amount: INT]
[info] at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
[info] at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
[info] at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
[info] at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
[info] at scala.collection.Iterator.foreach(Iterator.scala:943)
[info] at scala.collection.Iterator.foreach$(Iterator.scala:943)
[info] at
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[info] at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info] at
scala.collection.IterableLike.foreach$(IterableLike.scala:73)
[info] at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
Code:
import com.remind.graph.people.PeopleJobScala
import org.scalatest.funsuite._
import org.scalatest.BeforeAndAfter
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.table.runtime.util._
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.api.common.state.ListState
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.types.Row
import java.io.Serializable;
import java.sql.Timestamp;
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.{util => ju}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
caseclassOrder(user: Long, product: ProductItem, amount: Int) {
defthis() {
this(0, null, 0)
}
overridedeftoString(): String = {
return"Order{"+
"user="+ user +
", product='"+ product + '\''+
", amount="+ amount +
'}';
}
}
caseclassProductItem(name: String, id: Long) {
defthis() {
this(null, 0)
}
overridedeftoString(): String = {
return"Product{"+
"name='"+ name + '\''+
", id="+ id +
'}';
}
}
classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
varenv: StreamExecutionEnvironment = _
vartEnv: StreamTableEnvironment = _
before {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
this.env.setParallelism(2)
this.env.getConfig.enableObjectReuse()
valsetting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}
after {
StreamTestSink.clear()
// TestValuesTableFactory.clearAllData()
}
defdateFrom(stringDate: String): java.sql.Date = {
valdate = newSimpleDateFormat("dd/MM/yyyy")
.parse(stringDate)
returnnewjava.sql.Date(date.getTime())
}
defprintTable(table: Table) = {
println(table)
table.printSchema()
println(table.getSchema().getFieldNames().mkString(", "))
}
defprintDataStream(dataStream: DataStream[_]) = {
println(dataStream)
println(dataStream.dataType)
}
test("dummy") {
valorderA: DataStream[Order] = this.env.fromCollection(
Seq(
newOrder(1L, newProductItem("beer", 10L), 3),
newOrder(1L, newProductItem("diaper", 11L), 4),
newOrder(3L, newProductItem("rubber", 12L), 2)
)
)
valorderB: DataStream[Order] = this.env.fromCollection(
Seq(
newOrder(2L, newProductItem("pen", 13L), 3),
newOrder(2L, newProductItem("rubber", 12L), 3),
newOrder(4L, newProductItem("beer", 10L), 1)
)
)
println(orderB)
println(orderB.dataType)
// convert DataStream to Table
valtableA =
this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
println(tableA)
tableA.printSchema()
println(tableA.getSchema().getFieldNames().mkString(", "))
// register DataStream as Table
this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
'amount)
// union the two tables
valresult = this.tEnv.sqlQuery(s"""
|SELECT * FROM $tableAWHERE amount > 2
|UNION ALL
|SELECT * FROM OrderB WHERE amount < 2
""".stripMargin)
valsink = newStringSink[Order]()
result.toAppendStream[Order].addSink(sink)
this.env.execute()
valexpected = List(
"Order{user=1, product='Product{name='beer', id=10}', amount=3}",
"Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
"Order{user=4, product='Product{name='beer', id=10}', amount=1}"
)
valresults = sink.getResults.sorted
println("results")
println(results)
assert(expected.sorted === results)
}
}
/**
* Taken from:
https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
* There's a whole bunch of other test sinks to choose from there.
*/
objectStreamTestSink {
validCounter: AtomicInteger = newAtomicInteger(0)
valglobalResults =
mutable.HashMap.empty[Int, mutable.Map[Int,
mutable.ArrayBuffer[String]]]
valglobalRetractResults =
mutable.HashMap.empty[Int, mutable.Map[Int,
mutable.ArrayBuffer[String]]]
valglobalUpsertResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
String]]]
defgetNewSinkId: Int = {
validx = idCounter.getAndIncrement()
this.synchronized{
globalResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalRetractResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalUpsertResults.put(
idx,
mutable.HashMap.empty[Int, mutable.Map[String, String]]
)
}
idx
}
defclear(): Unit = {
globalResults.clear()
globalRetractResults.clear()
globalUpsertResults.clear()
}
}
abstractclassAbstractExactlyOnceSink[T]
extendsRichSinkFunction[T]
withCheckpointedFunction{
protectedvarresultsState: ListState[String] = _
protectedvarlocalResults: mutable.ArrayBuffer[String] = _
protectedvalidx: Int = StreamTestSink.getNewSinkId
protectedvarglobalResults: mutable.Map[Int,
mutable.ArrayBuffer[String]] = _
protectedvarglobalRetractResults
: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protectedvarglobalUpsertResults
: mutable.Map[Int, mutable.Map[String, String]] = _
defisInitialized: Boolean = globalResults != null
overridedefinitializeState(context: FunctionInitializationContext):
Unit = {
resultsState = context.getOperatorStateStore
.getListState(
newListStateDescriptor[String]("sink-results", Types.STRING)
)
localResults = mutable.ArrayBuffer.empty[String]
if(context.isRestored) {
for(value <- resultsState.get().asScala) {
localResults += value
}
}
valtaskId = getRuntimeContext.getIndexOfThisSubtask
StreamTestSink.synchronized(
StreamTestSink.globalResults(idx) += (taskId -> localResults)
)
}
overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
resultsState.clear()
for(value <- localResults) {
resultsState.add(value)
}
}
protecteddefclearAndStashGlobalResults(): Unit = {
if(globalResults == null) {
StreamTestSink.synchronized{
globalResults = StreamTestSink.globalResults.remove(idx).get
globalRetractResults =
StreamTestSink.globalRetractResults.remove(idx).get
globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
}
}
}
protecteddefgetResults: List[String] = {
clearAndStashGlobalResults()
valresult = mutable.ArrayBuffer.empty[String]
this.globalResults.foreach {
case(_, list) => result ++= list
}
result.toList
}
}
finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
overridedefinvoke(value: T) {
localResults += value.toString
}
overridedefgetResults: List[String] = super.getResults
}
On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org
<mailto:aljos...@apache.org>> wrote:
@Timo: Is this sth that would work when using the new type
stack? From
the message I'm assuming it's using the older type stack.
@Rex: Which Flink version are you using and could you maybe post
the
code snipped that you use to do conversions?
Best,
Aljoscha
On 02.11.20 06:50, Rex Fenley wrote:
> Maybe this is related to this issue?
> https://issues.apache.org/jira/browse/FLINK-17683
>
> On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com
<mailto:r...@remind101.com>> wrote:
>
>> Correction, I'm using Scala case classes not strictly Java
POJOs just to
>> be clear.
>>
>> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
<r...@remind101.com <mailto:r...@remind101.com>> wrote:
>>
>>> Hello,
>>>
>>> I keep running into trouble moving between DataStream and
SQL with POJOs
>>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
is there any
>>> way to convert them back to POJOs in Flink when converting
a SQL Table back
>>> to a DataStream?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley | Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> | BLOG
<http://blog.remind.com/>
>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley | Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> | BLOG
<http://blog.remind.com/> |
>> FOLLOW US <https://twitter.com/remindhq> | LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
--
Rex Fenley|Software Engineer - Mobile and Backend
Remind.com <https://www.remind.com/>| BLOG
<http://blog.remind.com/> | FOLLOW US
<https://twitter.com/remindhq> | LIKE US
<https://www.facebook.com/remindhq>
--
Rex Fenley|Software Engineer - Mobile and Backend
Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
FOLLOW US <https://twitter.com/remindhq> | LIKE US
<https://www.facebook.com/remindhq>