Hi Jingsong,

Thanks for getting back. I’ll try and hook up the UDTF.

I added a unit test which catches the issue I’m running into (I tested this 
against Flink 1.6 which is what we’re running as well as latest master). Did 
you have to do anything in particular to hook up the type correctly?

Error I get is: “Caused by: org.apache.calcite.runtime.CalciteContextException: 
From line 1, column 114 to line 1, column 120: List of column aliases must have 
same degree as table; table has 3 columns ('price', 'quantity', 'externalId'), 
whereas alias list has 1 columns”

@Test
def testArrayOfRow(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.clear

  class Event(t0: Int, t1: Array[Row]) extends 
org.apache.flink.api.java.tuple.Tuple2[Int, Array[Row]](t0, t1)
  def row(values: Any*): Row = Row.of(values.map(_.asInstanceOf[AnyRef]):_*)

  val rowType = Types.ROW(fieldNames = Array("price", "quantity", 
"externalId"), Array(Types.DOUBLE, Types.INT, Types.INT))
  implicit val typeInfo = new TupleTypeInfo[Event](Types.INT, 
Types.OBJECT_ARRAY(rowType))

  val myArr1 = Array[Row](row(12.45, 10, 1))
  val myArr2 = Array[Row](row(10.0, 1, 1), row(20.0, 1, 1))
  val myArr3 = Array[Row](row(12.45, 10, 1))

  val input = env.fromElements[Event](
    new Event(123, myArr1),
    new Event(123, myArr2),
    new Event(456, myArr3)
  )

  tEnv.registerDataStream[Event]("advertiser_event", input, 'partnerId, 
'products)

  val table = tEnv.sqlQuery("SELECT partnerId, product.price, product.quantity 
FROM advertiser_event, UNNEST(advertiser_event.products) AS t (product) ")
  table.toAppendStream[Row](table.getSchema.toRowType).print()
}

When I list out all three fields instead of t(product), I don’t face the issue..

Thanks,

-- Piyush


From: JingsongLee <lzljs3620...@aliyun.com>
Reply-To: JingsongLee <lzljs3620...@aliyun.com>
Date: Tuesday, June 4, 2019 at 2:42 AM
To: JingsongLee <lzljs3620...@aliyun.com>, Piyush Narang <p.nar...@criteo.com>, 
"user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang

I tried again, if the type of advertiser_event.products is derived correctly. 
(ObjectTypeInfo(RowTypeInfo(fields...)))
It will work. See more information in calcite code: 
SqlUnnestOperator.inferReturnType
So I think maybe your type is not passed to the engine correctly.

Best, JingsongLee

------------------------------------------------------------------
From:JingsongLee <lzljs3620...@aliyun.com>
Send Time:2019年6月4日(星期二) 13:35
To:Piyush Narang <p.nar...@criteo.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject:Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return 
type can not be inferred in UNNEST. (Errors were reported during the Calcite 
Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider 
writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL 
TABLE)

Best, JingsongLee

------------------------------------------------------------------
From:Piyush Narang <p.nar...@criteo.com>
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org <user@flink.apache.org>
Subject:Clean way of expressing UNNEST operations

Hi folks,

I’m using the SQL API and trying to figure out the best way to unnest and 
operate on some data.
My data is structured as follows:
Table:
Advertiser_event:

  *   Partnered: Int
  *   Products: Array< Row< price: Double, quantity: Int, … > >
  *   …

I’m trying to unnest the products array and then compute something on a couple 
of fields in the product row (e.g. price * quantity)

My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, 
UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)

My issue / problem is that, when I try to unnest this array<row> I need to 
specify all the fields in the temp table as part of the unnest (“t” above). If 
I don’t, I get an error saying the number of fields doesn’t match what is 
expected. This makes my query a bit fragile in case additional fields are added 
/ removed from this product structure.

Does anyone know if there’s a way around this? As a contrast on an engine like 
Presto, the unnest operation would yield a ‘product’ row type which I can then 
use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS 
JOIN UNNEST(products) AS product

Thanks,

-- Piyush

Reply via email to