Hi Stephan, Thanks for your response!
Task manager lost/killed has been a recurring problem I've had with Flink for the last few months, as I try to scale to larger and larger amounts of data. I would be very grateful for some help figuring out how I can avoid this. The program is set up something like this: / DataSet<CustomType> data = env.fromCollection(listOfFiles) .rebalance() .flatMap(new ReadFiles()) .filter(new FilterData()); DataSet<Tuple8> computation1 = data .map(new Compute1()) .distinct() .map(new Compute2()) .groupBy(0, 1, 2) .aggregate(SUM, 3).and(SUM, 4).and(SUM, 5); Dataset<Tuple10> computation2 = data .map(new Compute3()) .distinct() .map(new Compute4()) .groupBy(0, 1, 2) .aggregate(SUM, 3).and(SUM, 4).and(SUM, 5); Dataset<Tuple12> finalOP = computation1.join(computation2) .where(0, 1) .equalTo(0, 1) .with(new Join1()) .sortPartition(0, Order.ASCENDING) .setParallelism(1); finalOP.writeAsCsv("s3://myBucket/myKey.csv"); --- public static final class ReadFiles implements FlatMapFunction<String, CustomType> { @Override public void flatMap(String fileName, Collector<CustomType> out) throws Exception { S3FileReaderAndParser parser = new S3FileReaderAndParser(fileName); List<CustomType> dataList = parser.parseFiles(); for (CustomType data : dataList) { out.collect(data); } } } / Task Manager is killed/lost during the ReadFiles() flatmap. ReadFiles is a flatmap function that reads each of the files from S3 using the AWS S3 Java SDK and parses and emits each of the protobufs. And yes, I can find a message like this in the logs about "gated" systems: 2017-10-12 20:46:00,355 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@ip-172-31-8-29:38763] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-172-31-8-29:38763]] Caused by: [Connection refused: ip-172-31-8-29/172.31.8.29:38763] Thank you! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/