[ https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
SUBRAMANYA SURESH updated FLINK-9166: ------------------------------------- Description: With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster. * JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?). * The same works as standalone java program locally (high CPU initially) * Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message. * In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 (partitions in our Kafka source). *Query:* {code:java} select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source from structStream where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success' group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source {code} *Code:* {code:java} public static void main(String[] args) throws Exception { FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>()); final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment(); final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment); final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment); tableEnv.registerDataStream("structStream", structStream); tableEnv.scan("structStream").printSchema(); for (int i = 0; i < 100; i++){ for (String query : Queries.sample){ // Queries.sample has one query that is above. Table selectQuery = tableEnv.sqlQuery(query); DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery, Row.class); selectQueryStream.print(); } } // execute program streamingEnvironment.execute("Kafka Streaming SQL"); } private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception { Properties properties = getKafkaProperties(); // TestDeserializer deserializes the JSON to a ROW of string columns (515) // and also adds a column for the raw message. FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties); DataStream<Row> stream = environment.addSource(consumer); return stream; } private static RowTypeInfo getRowTypeInfo() throws Exception { // This has 515 fields. List<String> fieldNames = DDIManager.getDDIFieldNames(); fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer fieldNames.add("proctime"); // Fill typeInformationArray with StringType to all but the last field which is of type Time ..... return new RowTypeInfo(typeInformationArray, fieldNamesArray); } private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000); env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR)); env.setParallelism(725); return env; } {code} was: With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster. * JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?). * The same works as standalone java program locally (high CPU initially) * Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message. * In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka source). *Query:* {code:java} select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source from structStream where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success' group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source {code} *Code:* {code:java} public static void main(String[] args) throws Exception { FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>()); final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment(); final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment); final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment); tableEnv.registerDataStream("structStream", structStream); tableEnv.scan("structStream").printSchema(); for (int i = 0; i < 100; i++){ for (String query : Queries.sample){ // Queries.sample has one query that is above. Table selectQuery = tableEnv.sqlQuery(query); DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery, Row.class); selectQueryStream.print(); } } // execute program streamingEnvironment.execute("Kafka Streaming SQL"); } private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception { Properties properties = getKafkaProperties(); // TestDeserializer deserializes the JSON to a ROW of string columns (515) // and also adds a column for the raw message. FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties); DataStream<Row> stream = environment.addSource(consumer); return stream; } private static RowTypeInfo getRowTypeInfo() throws Exception { // This has 515 fields. List<String> fieldNames = DDIManager.getDDIFieldNames(); fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer fieldNames.add("proctime"); // Fill typeInformationArray with StringType to all but the last field which is of type Time ..... return new RowTypeInfo(typeInformationArray, fieldNamesArray); } private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000); env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR)); env.setParallelism(725); return env; } {code} > Performance issue with Flink SQL > -------------------------------- > > Key: FLINK-9166 > URL: https://issues.apache.org/jira/browse/FLINK-9166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.2 > Reporter: SUBRAMANYA SURESH > Priority: Major > Labels: flink, graph, performance, sql, yarn > > With a high number of Flink SQL queries (100 of below), the Flink command > line client fails with a "JobManager did not respond within 600000 ms" on a > Yarn cluster. > * JobManager logs has nothing after the last TaskManager started except > DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in > JobManager", indicating its likely stuck (creating the ExecutionGraph?). > * The same works as standalone java program locally (high CPU initially) > * Note: Each Row in structStream contains 515 columns (many end up null) > including a column that has the raw message. > * In the YARN cluster we specify 18GB for TaskManager, 18GB for the > JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 > (partitions in our Kafka source). > *Query:* > {code:java} > select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, > EventTimestamp, RawMsg, Source > from structStream > where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' > and Outcome='Success' > group by tumble(proctime, INTERVAL '1' SECOND), Environment, > CollectedTimestamp, EventTimestamp, RawMsg, Source > {code} > *Code:* > {code:java} > public static void main(String[] args) throws Exception { > > FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), > new HashMap<>()); > final StreamExecutionEnvironment streamingEnvironment = > getStreamExecutionEnvironment(); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(streamingEnvironment); > final DataStream<Row> structStream = > getKafkaStreamOfRows(streamingEnvironment); > tableEnv.registerDataStream("structStream", structStream); > tableEnv.scan("structStream").printSchema(); > for (int i = 0; i < 100; i++){ > for (String query : Queries.sample){ > // Queries.sample has one query that is above. > Table selectQuery = tableEnv.sqlQuery(query); > DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery, > Row.class); > selectQueryStream.print(); > } > } > // execute program > streamingEnvironment.execute("Kafka Streaming SQL"); > } > private static DataStream<Row> > getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception > { > Properties properties = getKafkaProperties(); > // TestDeserializer deserializes the JSON to a ROW of string columns (515) > // and also adds a column for the raw message. > FlinkKafkaConsumer011 consumer = new > FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new > TestDeserializer(getRowTypeInfo()), properties); > DataStream<Row> stream = environment.addSource(consumer); > return stream; > } > private static RowTypeInfo getRowTypeInfo() throws Exception { > // This has 515 fields. > List<String> fieldNames = DDIManager.getDDIFieldNames(); > fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer > fieldNames.add("proctime"); > // Fill typeInformationArray with StringType to all but the last field which > is of type Time > ..... > return new RowTypeInfo(typeInformationArray, fieldNamesArray); > } > private static StreamExecutionEnvironment getStreamExecutionEnvironment() > throws IOException { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > env.enableCheckpointing(60000); > env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR)); > env.setParallelism(725); > return env; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)