Can you share the operator plan
(StreamExecutionEnvironment.getExecutionPlan()) for both cases?

Thanks, Fabian

2018-03-14 9:08 GMT+01:00 杨力 <bill.le...@gmail.com>:

> I understand complex SQL queries would be translated into large DAGs.
> However, the submission succeeds in my case if I don't use union operator.
> It might be a potential bug related to it. For example, following code
> submisses successfully with the default limitations of akka.framesize.
>
> val sqls: Seq[String] = ...
> val sink: JDBCAppendTableSink = ...
>
> sqls foreach {
>   sql =>
>     val table = tEnv.sqlQuery(sql)
>     val outputStream = tEnv.toAppendStream[Row](table) map {
>       ...
>     }
>     tEnv.fromDataStream(outputStream).writeToSink(sink)
> }
>
> If I union these outputStreams and send it to a single sink, the size of
> serialized job will be 100 MB.
>
> val outputStream = sqls map {
>   sql =>
>     val table = tEnv.sqlQuery(sql)
>     tEnv.toAppendStream[Row](table) map {
>       ...
>     }
> } reduce {
>   (a, b) => a union b
> }
> tEnv.fromDataStream(outputStream).writeToSink(sink)
>
> I failed to reproduce it without actually used table schemas and SQL
> queries in my production. And at last I wrote my own JDBC sink with
> connection pooling to migrate this problem. Maybe someone familiar with the
> implementation of union operator would figure out what's going wrong.
>
> Fabian Hueske <fhue...@gmail.com> 于 2018年3月13日周二 下午11:42写道:
>
>> Hi Bill,
>>
>> The size of the program depends on the number and complexity SQL queries
>> that you are submitting.
>> Each query might be translated into a sequence of multiple operators.
>> Each operator has a string with generated code that will be compiled on the
>> worker nodes. The size of the code depends on the number of fields in the
>> schema.
>> Operators and code are not shared across queries.
>>
>> Best, Fabian
>>
>> 2018-03-09 23:36 GMT+01:00 杨力 <bill.le...@gmail.com>:
>>
>>> Thank you for your response. It occurs both in a standalone cluster anda
>>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>>> minimal demo.
>>>
>>>
>>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you provide more details about your queries and setup? Logs could
>>>> be helpful as well.
>>>>
>>>> Piotrek
>>>>
>>>> > On 9 Mar 2018, at 11:00, 杨力 <bill.le...@gmail.com> wrote:
>>>> >
>>>> > I wrote a flink-sql app with following topography.
>>>> >
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>>> JDBCAppendTableSink
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>>> JDBCAppendTableSink
>>>> > ...
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>>> JDBCAppendTableSink
>>>> >
>>>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>>>> number of JDBCAppendTableSink times parallelism, that is the number of
>>>> concurrent connections to database, is too large for the database server to
>>>> handle. So I tried union DataStreams before connecting them to the
>>>> TableSink.
>>>> >
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>>> > \
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>>>> JDBCAppendTableSink
>>>> > ... /
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>>> >
>>>> > With this strategy, job submission failed with an
>>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>>>> avoid this exception, but job submission hangs and times out.
>>>> >
>>>> > I can't understand why a simple union operator would serialize to
>>>> such a large message. Can I avoid this problem?
>>>> > Or can I change some configuration to fix the submission time out?
>>>> >
>>>> > Regards,
>>>> > Bill
>>>>
>>>>
>>

Reply via email to