I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator.
It might be a potential bug related to it. For example, following code
submisses successfully with the default limitations of akka.framesize.

val sqls: Seq[String] = ...
val sink: JDBCAppendTableSink = ...

sqls foreach {
  sql =>
    val table = tEnv.sqlQuery(sql)
    val outputStream = tEnv.toAppendStream[Row](table) map {
      ...
    }
    tEnv.fromDataStream(outputStream).writeToSink(sink)
}

If I union these outputStreams and send it to a single sink, the size of
serialized job will be 100 MB.

val outputStream = sqls map {
  sql =>
    val table = tEnv.sqlQuery(sql)
    tEnv.toAppendStream[Row](table) map {
      ...
    }
} reduce {
  (a, b) => a union b
}
tEnv.fromDataStream(outputStream).writeToSink(sink)

I failed to reproduce it without actually used table schemas and SQL
queries in my production. And at last I wrote my own JDBC sink with
connection pooling to migrate this problem. Maybe someone familiar with the
implementation of union operator would figure out what's going wrong.

Fabian Hueske <fhue...@gmail.com> 于 2018年3月13日周二 下午11:42写道:

> Hi Bill,
>
> The size of the program depends on the number and complexity SQL queries
> that you are submitting.
> Each query might be translated into a sequence of multiple operators. Each
> operator has a string with generated code that will be compiled on the
> worker nodes. The size of the code depends on the number of fields in the
> schema.
> Operators and code are not shared across queries.
>
> Best, Fabian
>
> 2018-03-09 23:36 GMT+01:00 杨力 <bill.le...@gmail.com>:
>
>> Thank you for your response. It occurs both in a standalone cluster anda
>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>> minimal demo.
>>
>>
>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Could you provide more details about your queries and setup? Logs could
>>> be helpful as well.
>>>
>>> Piotrek
>>>
>>> > On 9 Mar 2018, at 11:00, 杨力 <bill.le...@gmail.com> wrote:
>>> >
>>> > I wrote a flink-sql app with following topography.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > ...
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> >
>>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>>> number of JDBCAppendTableSink times parallelism, that is the number of
>>> concurrent connections to database, is too large for the database server to
>>> handle. So I tried union DataStreams before connecting them to the
>>> TableSink.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> > \
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>>> JDBCAppendTableSink
>>> > ... /
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> >
>>> > With this strategy, job submission failed with an
>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>>> avoid this exception, but job submission hangs and times out.
>>> >
>>> > I can't understand why a simple union operator would serialize to such
>>> a large message. Can I avoid this problem?
>>> > Or can I change some configuration to fix the submission time out?
>>> >
>>> > Regards,
>>> > Bill
>>>
>>>
>

Reply via email to