[ 
https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SUBRAMANYA SURESH updated FLINK-9166:
-------------------------------------
    Description: 
With a high number of Flink SQL queries (100 of below), the Flink command line 
client fails with a "JobManager did not respond within 600000 ms" on a Yarn 
cluster.
 * JobManager logs has nothing after the last TaskManager started except DEBUG 
logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in 
JobManager", indicating its likely stuck (creating the ExecutionGraph?).
 * The same works as standalone java program locally (high CPU initially)
 * Note: Each Row in structStream contains 515 columns (many end up null) 
including a column that has the raw message.
 * In the YARN cluster we specify 18GB for TaskManager, 18GB for the 
JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka 
source).

*Query:*
{code:java}
 select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
EventTimestamp, RawMsg, Source 
 from structStream
 where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
and Outcome='Success'
 group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
CollectedTimestamp, EventTimestamp, RawMsg, Source
{code}
*Code:*
{code:java}
public static void main(String[] args) throws Exception {
 
FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(),
 new HashMap<>());

 final StreamExecutionEnvironment streamingEnvironment = 
getStreamExecutionEnvironment();
 final StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(streamingEnvironment);

 final DataStream<Row> structStream = 
getKafkaStreamOfRows(streamingEnvironment);
 tableEnv.registerDataStream("structStream", structStream);
 tableEnv.scan("structStream").printSchema();

 for (int i = 0; i < 100; i++){
   for (String query : Queries.sample){
     // Queries.sample has one query that is above. 
     Table selectQuery = tableEnv.sqlQuery(query);

     DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery,  
Row.class);
     selectQueryStream.print();
   }
 }

 // execute program
 streamingEnvironment.execute("Kafka Streaming SQL");
}

private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment 
environment) throws Exception {
  Properties properties = getKafkaProperties();
  // TestDeserializer deserializes the JSON to a ROW of string columns (515)
  // and also adds a column for the raw message. 
  FlinkKafkaConsumer011 consumer = new           
FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new            
TestDeserializer(getRowTypeInfo()), properties);
 DataStream<Row> stream = environment.addSource(consumer);

 return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
  // This has 515 fields. 
  List<String> fieldNames = DDIManager.getDDIFieldNames();
  fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
  fieldNames.add("proctime");

 // Fill typeInformationArray with StringType to all but the last field which   
is of type Time
  .....
  return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() 
throws IOException {
  final StreamExecutionEnvironment env =      
StreamExecutionEnvironment.getExecutionEnvironment(); 
   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

   env.enableCheckpointing(60000);
   env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
   env.setParallelism(725);
   return env;
}
{code}

  was:
With a high number of Flink SQL queries (100 of below), the Flink command line 
client fails with a "JobManager did not respond within 600000 ms" on a Yarn 
cluster. 
JobManager logs has nothing after the last TaskManager started except DEBUG 
logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in 
JobManager", indicating its likely stuck (creating the ExecutionGraph?). 

The same works as standalone java program locally (high CPU initially)

Note: Each Row in structStream contains 515 columns (many end up null) 
including a column that has the raw message.

In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 
slots each and parallelism of 725 (partitions in our Kafka source).

*Query:*
{code:java}
 select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
EventTimestamp, RawMsg, Source 
 from structStream
 where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
and Outcome='Success'
 group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
CollectedTimestamp, EventTimestamp, RawMsg, Source
{code}
*Code:*
{code:java}
public static void main(String[] args) throws Exception {
 
FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(),
 new HashMap<>());

 final StreamExecutionEnvironment streamingEnvironment = 
getStreamExecutionEnvironment();
 final StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(streamingEnvironment);

 final DataStream<Row> structStream = 
getKafkaStreamOfRows(streamingEnvironment);
 tableEnv.registerDataStream("structStream", structStream);
 tableEnv.scan("structStream").printSchema();

 for (int i = 0; i < 100; i++){
   for (String query : Queries.sample){
     // Queries.sample has one query that is above. 
     Table selectQuery = tableEnv.sqlQuery(query);

     DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery,  
Row.class);
     selectQueryStream.print();
   }
 }

 // execute program
 streamingEnvironment.execute("Kafka Streaming SQL");
}

private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment 
environment) throws Exception {
  Properties properties = getKafkaProperties();
  // TestDeserializer deserializes the JSON to a ROW of string columns (515)
  // and also adds a column for the raw message. 
  FlinkKafkaConsumer011 consumer = new           
FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new            
TestDeserializer(getRowTypeInfo()), properties);
 DataStream<Row> stream = environment.addSource(consumer);

 return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
  // This has 515 fields. 
  List<String> fieldNames = DDIManager.getDDIFieldNames();
  fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
  fieldNames.add("proctime");

 // Fill typeInformationArray with StringType to all but the last field which   
is of type Time
  .....
  return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() 
throws IOException {
  final StreamExecutionEnvironment env =      
StreamExecutionEnvironment.getExecutionEnvironment(); 
   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

   env.enableCheckpointing(60000);
   env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
   env.setParallelism(725);
   return env;
}
{code}


> Performance issue with Flink SQL
> --------------------------------
>
>                 Key: FLINK-9166
>                 URL: https://issues.apache.org/jira/browse/FLINK-9166
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.4.2
>            Reporter: SUBRAMANYA SURESH
>            Priority: Major
>              Labels: flink, graph, performance, sql, yarn
>
> With a high number of Flink SQL queries (100 of below), the Flink command 
> line client fails with a "JobManager did not respond within 600000 ms" on a 
> Yarn cluster.
>  * JobManager logs has nothing after the last TaskManager started except 
> DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in 
> JobManager", indicating its likely stuck (creating the ExecutionGraph?).
>  * The same works as standalone java program locally (high CPU initially)
>  * Note: Each Row in structStream contains 515 columns (many end up null) 
> including a column that has the raw message.
>  * In the YARN cluster we specify 18GB for TaskManager, 18GB for the 
> JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka 
> source).
> *Query:*
> {code:java}
>  select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
> EventTimestamp, RawMsg, Source 
>  from structStream
>  where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
> and Outcome='Success'
>  group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
> CollectedTimestamp, EventTimestamp, RawMsg, Source
> {code}
> *Code:*
> {code:java}
> public static void main(String[] args) throws Exception {
>  
> FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(),
>  new HashMap<>());
>  final StreamExecutionEnvironment streamingEnvironment = 
> getStreamExecutionEnvironment();
>  final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(streamingEnvironment);
>  final DataStream<Row> structStream = 
> getKafkaStreamOfRows(streamingEnvironment);
>  tableEnv.registerDataStream("structStream", structStream);
>  tableEnv.scan("structStream").printSchema();
>  for (int i = 0; i < 100; i++){
>    for (String query : Queries.sample){
>      // Queries.sample has one query that is above. 
>      Table selectQuery = tableEnv.sqlQuery(query);
>      DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery, 
>  Row.class);
>      selectQueryStream.print();
>    }
>  }
>  // execute program
>  streamingEnvironment.execute("Kafka Streaming SQL");
> }
> private static DataStream<Row> 
> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception 
> {
>   Properties properties = getKafkaProperties();
>   // TestDeserializer deserializes the JSON to a ROW of string columns (515)
>   // and also adds a column for the raw message. 
>   FlinkKafkaConsumer011 consumer = new           
> FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new            
> TestDeserializer(getRowTypeInfo()), properties);
>  DataStream<Row> stream = environment.addSource(consumer);
>  return stream;
> }
> private static RowTypeInfo getRowTypeInfo() throws Exception {
>   // This has 515 fields. 
>   List<String> fieldNames = DDIManager.getDDIFieldNames();
>   fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
>   fieldNames.add("proctime");
>  // Fill typeInformationArray with StringType to all but the last field which 
>   is of type Time
>   .....
>   return new RowTypeInfo(typeInformationArray, fieldNamesArray);
> }
> private static StreamExecutionEnvironment getStreamExecutionEnvironment() 
> throws IOException {
>   final StreamExecutionEnvironment env =      
> StreamExecutionEnvironment.getExecutionEnvironment(); 
>    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>    env.enableCheckpointing(60000);
>    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
>    env.setParallelism(725);
>    return env;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to