[ https://issues.apache.org/jira/browse/FLINK-18811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kai Chen updated FLINK-18811: ----------------------------- Description: I met this Exception when a hard disk was damaged: !flink_disk_error.png! I checked the code and found that flink will create a temp file in tempDirs when Record length > 5 MB: {code:java} // SpillingAdaptiveSpanningRecordDeserializer.java if (nextRecordLength > THRESHOLD_FOR_SPILLING) { // create a spilling channel and put the data there this.spillingChannel = createSpillingChannel(); ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); FileUtils.writeCompletely(this.spillingChannel, toWrite); } {code} The tempDir is random picked from all `tempDirs`。Well on yarn mode, one `tempDir` usually represents one hard disk. In may opinion, if a hard disk is damaged, taskmanager should pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again. If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this: {code:java} // SpillingAdaptiveSpanningRecordDeserializer.java private FileChannel createSpillingChannel() throws IOException { if (spillFile != null) { throw new IllegalStateException("Spilling file already exists."); } // try to find a unique file name for the spilling channel int maxAttempts = 10; String[] tempDirs = this.tempDirs; for (int attempt = 0; attempt < maxAttempts; attempt++) { int dirIndex = rnd.nextInt(tempDirs.length); String directory = tempDirs[dirIndex]; spillFile = new File(directory, randomString(rnd) + ".inputchannel"); try { if (spillFile.createNewFile()) { return new RandomAccessFile(spillFile, "rw").getChannel(); } } catch (IOException e) { // if there is no tempDir left to try if(tempDirs.length <= 1) { throw e; } LOG.warn("Caught an IOException when creating spill file: " + directory + ". Attempt " + attempt, e); tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex); } } {code} was: I met this Exception when a hard disk was damaged: !flink_disk_error.png! I checked the code and found that flink will create a temp file in tempDirs when Record length > 5 MB: {code:java} // SpillingAdaptiveSpanningRecordDeserializer.java if (nextRecordLength > THRESHOLD_FOR_SPILLING) { // create a spilling channel and put the data there this.spillingChannel = createSpillingChannel(); ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); FileUtils.writeCompletely(this.spillingChannel, toWrite); } {code} The tempDir is random picked from all `tempDirs`。Well on yarn mode, one `tempDir` usually represents one hard disk. I think, if a hard disk is damaged, taskmanager should pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again. If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this: {code:java} // SpillingAdaptiveSpanningRecordDeserializer.java private FileChannel createSpillingChannel() throws IOException { if (spillFile != null) { throw new IllegalStateException("Spilling file already exists."); } // try to find a unique file name for the spilling channel int maxAttempts = 10; String[] tempDirs = this.tempDirs; for (int attempt = 0; attempt < maxAttempts; attempt++) { int dirIndex = rnd.nextInt(tempDirs.length); String directory = tempDirs[dirIndex]; spillFile = new File(directory, randomString(rnd) + ".inputchannel"); try { if (spillFile.createNewFile()) { return new RandomAccessFile(spillFile, "rw").getChannel(); } } catch (IOException e) { // if there is no tempDir left to try if(tempDirs.length <= 1) { throw e; } LOG.warn("Caught an IOException when creating spill file: " + directory + ". Attempt " + attempt, e); tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex); } } {code} > if a disk is damaged, taskmanager should choose another disk for temp dir , > rather than throw an IOException, which causes flink job restart over and > over again > ---------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-18811 > URL: https://issues.apache.org/jira/browse/FLINK-18811 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network > Environment: flink-1.10 > Reporter: Kai Chen > Priority: Major > Attachments: flink_disk_error.png > > > I met this Exception when a hard disk was damaged: > !flink_disk_error.png! > I checked the code and found that flink will create a temp file in tempDirs > when Record length > 5 MB: > > {code:java} > // SpillingAdaptiveSpanningRecordDeserializer.java > if (nextRecordLength > THRESHOLD_FOR_SPILLING) { > // create a spilling channel and put the data there > this.spillingChannel = createSpillingChannel(); > ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); > FileUtils.writeCompletely(this.spillingChannel, toWrite); > } > {code} > The tempDir is random picked from all `tempDirs`。Well on yarn mode, one > `tempDir` usually represents one hard disk. > > In may opinion, if a hard disk is damaged, taskmanager should pick another > disk(tmpDir) for Spilling Channel, rather than throw an IOException, which > causes flink job restart over and over again. > If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like > this: > {code:java} > // SpillingAdaptiveSpanningRecordDeserializer.java > private FileChannel createSpillingChannel() throws IOException { > if (spillFile != null) { > throw new IllegalStateException("Spilling file already exists."); > } > // try to find a unique file name for the spilling channel > int maxAttempts = 10; > String[] tempDirs = this.tempDirs; > for (int attempt = 0; attempt < maxAttempts; attempt++) { > int dirIndex = rnd.nextInt(tempDirs.length); > String directory = tempDirs[dirIndex]; > spillFile = new File(directory, randomString(rnd) + ".inputchannel"); > try { > if (spillFile.createNewFile()) { > return new RandomAccessFile(spillFile, "rw").getChannel(); > } > } catch (IOException e) { > // if there is no tempDir left to try > if(tempDirs.length <= 1) { > throw e; > } > LOG.warn("Caught an IOException when creating spill file: " + > directory + ". Attempt " + attempt, e); > tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex); > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)