Can you share the operator plan (StreamExecutionEnvironment.getExecutionPlan()) for both cases?
Thanks, Fabian 2018-03-14 9:08 GMT+01:00 杨力 <bill.le...@gmail.com>: > I understand complex SQL queries would be translated into large DAGs. > However, the submission succeeds in my case if I don't use union operator. > It might be a potential bug related to it. For example, following code > submisses successfully with the default limitations of akka.framesize. > > val sqls: Seq[String] = ... > val sink: JDBCAppendTableSink = ... > > sqls foreach { > sql => > val table = tEnv.sqlQuery(sql) > val outputStream = tEnv.toAppendStream[Row](table) map { > ... > } > tEnv.fromDataStream(outputStream).writeToSink(sink) > } > > If I union these outputStreams and send it to a single sink, the size of > serialized job will be 100 MB. > > val outputStream = sqls map { > sql => > val table = tEnv.sqlQuery(sql) > tEnv.toAppendStream[Row](table) map { > ... > } > } reduce { > (a, b) => a union b > } > tEnv.fromDataStream(outputStream).writeToSink(sink) > > I failed to reproduce it without actually used table schemas and SQL > queries in my production. And at last I wrote my own JDBC sink with > connection pooling to migrate this problem. Maybe someone familiar with the > implementation of union operator would figure out what's going wrong. > > Fabian Hueske <fhue...@gmail.com> 于 2018年3月13日周二 下午11:42写道: > >> Hi Bill, >> >> The size of the program depends on the number and complexity SQL queries >> that you are submitting. >> Each query might be translated into a sequence of multiple operators. >> Each operator has a string with generated code that will be compiled on the >> worker nodes. The size of the code depends on the number of fields in the >> schema. >> Operators and code are not shared across queries. >> >> Best, Fabian >> >> 2018-03-09 23:36 GMT+01:00 杨力 <bill.le...@gmail.com>: >> >>> Thank you for your response. It occurs both in a standalone cluster anda >>> a yarn-cluster. I am trying to remove business code and reproduce it with a >>> minimal demo. >>> >>> >>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Could you provide more details about your queries and setup? Logs could >>>> be helpful as well. >>>> >>>> Piotrek >>>> >>>> > On 9 Mar 2018, at 11:00, 杨力 <bill.le...@gmail.com> wrote: >>>> > >>>> > I wrote a flink-sql app with following topography. >>>> > >>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> >>>> JDBCAppendTableSink >>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> >>>> JDBCAppendTableSink >>>> > ... >>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> >>>> JDBCAppendTableSink >>>> > >>>> > I have a dozen of TableSources And tens of SQLs. As a result, the >>>> number of JDBCAppendTableSink times parallelism, that is the number of >>>> concurrent connections to database, is too large for the database server to >>>> handle. So I tried union DataStreams before connecting them to the >>>> TableSink. >>>> > >>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map >>>> > \ >>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> >>>> JDBCAppendTableSink >>>> > ... / >>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map >>>> > >>>> > With this strategy, job submission failed with an >>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to >>>> avoid this exception, but job submission hangs and times out. >>>> > >>>> > I can't understand why a simple union operator would serialize to >>>> such a large message. Can I avoid this problem? >>>> > Or can I change some configuration to fix the submission time out? >>>> > >>>> > Regards, >>>> > Bill >>>> >>>> >>