I understand complex SQL queries would be translated into large DAGs. However, the submission succeeds in my case if I don't use union operator. It might be a potential bug related to it. For example, following code submisses successfully with the default limitations of akka.framesize.
val sqls: Seq[String] = ... val sink: JDBCAppendTableSink = ... sqls foreach { sql => val table = tEnv.sqlQuery(sql) val outputStream = tEnv.toAppendStream[Row](table) map { ... } tEnv.fromDataStream(outputStream).writeToSink(sink) } If I union these outputStreams and send it to a single sink, the size of serialized job will be 100 MB. val outputStream = sqls map { sql => val table = tEnv.sqlQuery(sql) tEnv.toAppendStream[Row](table) map { ... } } reduce { (a, b) => a union b } tEnv.fromDataStream(outputStream).writeToSink(sink) I failed to reproduce it without actually used table schemas and SQL queries in my production. And at last I wrote my own JDBC sink with connection pooling to migrate this problem. Maybe someone familiar with the implementation of union operator would figure out what's going wrong. Fabian Hueske <fhue...@gmail.com> 于 2018年3月13日周二 下午11:42写道: > Hi Bill, > > The size of the program depends on the number and complexity SQL queries > that you are submitting. > Each query might be translated into a sequence of multiple operators. Each > operator has a string with generated code that will be compiled on the > worker nodes. The size of the code depends on the number of fields in the > schema. > Operators and code are not shared across queries. > > Best, Fabian > > 2018-03-09 23:36 GMT+01:00 杨力 <bill.le...@gmail.com>: > >> Thank you for your response. It occurs both in a standalone cluster anda >> a yarn-cluster. I am trying to remove business code and reproduce it with a >> minimal demo. >> >> >> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> Could you provide more details about your queries and setup? Logs could >>> be helpful as well. >>> >>> Piotrek >>> >>> > On 9 Mar 2018, at 11:00, 杨力 <bill.le...@gmail.com> wrote: >>> > >>> > I wrote a flink-sql app with following topography. >>> > >>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> >>> JDBCAppendTableSink >>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> >>> JDBCAppendTableSink >>> > ... >>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> >>> JDBCAppendTableSink >>> > >>> > I have a dozen of TableSources And tens of SQLs. As a result, the >>> number of JDBCAppendTableSink times parallelism, that is the number of >>> concurrent connections to database, is too large for the database server to >>> handle. So I tried union DataStreams before connecting them to the >>> TableSink. >>> > >>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map >>> > \ >>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> >>> JDBCAppendTableSink >>> > ... / >>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map >>> > >>> > With this strategy, job submission failed with an >>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to >>> avoid this exception, but job submission hangs and times out. >>> > >>> > I can't understand why a simple union operator would serialize to such >>> a large message. Can I avoid this problem? >>> > Or can I change some configuration to fix the submission time out? >>> > >>> > Regards, >>> > Bill >>> >>> >