[ 
https://issues.apache.org/jira/browse/FLINK-18811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kai Chen updated FLINK-18811:
-----------------------------
    Description: 
I met this Exception when a hard disk was damaged:

!flink_disk_error.png!

I checked the code and found that flink will create a temp file in tempDirs 
when Record length > 5 MB:

 
{code:java}
// SpillingAdaptiveSpanningRecordDeserializer.java
if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
   // create a spilling channel and put the data there
   this.spillingChannel = createSpillingChannel();

   ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
   FileUtils.writeCompletely(this.spillingChannel, toWrite);
}
{code}
The tempDir is random picked from all `tempDirs`。Well on yarn mode, one 
`tempDir`  usually represents one hard disk.

 

In may opinion, if a hard disk is damaged, taskmanager should pick another 
disk(tmpDir) for Spilling Channel, rather than throw an IOException, which 
causes flink job restart over and over again.

If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this:
{code:java}
// SpillingAdaptiveSpanningRecordDeserializer.java
private FileChannel createSpillingChannel() throws IOException {
   if (spillFile != null) {
      throw new IllegalStateException("Spilling file already exists.");
   }

   // try to find a unique file name for the spilling channel
   int maxAttempts = 10;
   String[] tempDirs = this.tempDirs;
   for (int attempt = 0; attempt < maxAttempts; attempt++) {
      int dirIndex = rnd.nextInt(tempDirs.length);
      String directory = tempDirs[dirIndex];
      spillFile = new File(directory, randomString(rnd) + ".inputchannel");
      try {
         if (spillFile.createNewFile()) {
            return new RandomAccessFile(spillFile, "rw").getChannel();
         }
      } catch (IOException e) {
         // if there is no tempDir left to try
         if(tempDirs.length <= 1) {
            throw e;
         }
         LOG.warn("Caught an IOException when creating spill file: " + 
directory + ". Attempt " + attempt, e);
         tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex);
      }
   }
{code}

  was:
I met this Exception when a hard disk was damaged:

!flink_disk_error.png!

I checked the code and found that flink will create a temp file in tempDirs 
when Record length > 5 MB:

 
{code:java}
// SpillingAdaptiveSpanningRecordDeserializer.java
if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
   // create a spilling channel and put the data there
   this.spillingChannel = createSpillingChannel();

   ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
   FileUtils.writeCompletely(this.spillingChannel, toWrite);
}
{code}
The tempDir is random picked from all `tempDirs`。Well on yarn mode, one 
`tempDir`  usually represents one hard disk.

 

I think, if a hard disk is damaged, taskmanager should pick another 
disk(tmpDir) for Spilling Channel, rather than throw an IOException, which 
causes flink job restart over and over again.

If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this:
{code:java}
// SpillingAdaptiveSpanningRecordDeserializer.java
private FileChannel createSpillingChannel() throws IOException {
   if (spillFile != null) {
      throw new IllegalStateException("Spilling file already exists.");
   }

   // try to find a unique file name for the spilling channel
   int maxAttempts = 10;
   String[] tempDirs = this.tempDirs;
   for (int attempt = 0; attempt < maxAttempts; attempt++) {
      int dirIndex = rnd.nextInt(tempDirs.length);
      String directory = tempDirs[dirIndex];
      spillFile = new File(directory, randomString(rnd) + ".inputchannel");
      try {
         if (spillFile.createNewFile()) {
            return new RandomAccessFile(spillFile, "rw").getChannel();
         }
      } catch (IOException e) {
         // if there is no tempDir left to try
         if(tempDirs.length <= 1) {
            throw e;
         }
         LOG.warn("Caught an IOException when creating spill file: " + 
directory + ". Attempt " + attempt, e);
         tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex);
      }
   }
{code}


> if a disk is damaged, taskmanager should choose another disk for temp dir , 
> rather than throw an IOException, which causes flink job restart over and 
> over again
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18811
>                 URL: https://issues.apache.org/jira/browse/FLINK-18811
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>         Environment: flink-1.10
>            Reporter: Kai Chen
>            Priority: Major
>         Attachments: flink_disk_error.png
>
>
> I met this Exception when a hard disk was damaged:
> !flink_disk_error.png!
> I checked the code and found that flink will create a temp file in tempDirs 
> when Record length > 5 MB:
>  
> {code:java}
> // SpillingAdaptiveSpanningRecordDeserializer.java
> if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
>    // create a spilling channel and put the data there
>    this.spillingChannel = createSpillingChannel();
>    ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
>    FileUtils.writeCompletely(this.spillingChannel, toWrite);
> }
> {code}
> The tempDir is random picked from all `tempDirs`。Well on yarn mode, one 
> `tempDir`  usually represents one hard disk.
>  
> In may opinion, if a hard disk is damaged, taskmanager should pick another 
> disk(tmpDir) for Spilling Channel, rather than throw an IOException, which 
> causes flink job restart over and over again.
> If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like 
> this:
> {code:java}
> // SpillingAdaptiveSpanningRecordDeserializer.java
> private FileChannel createSpillingChannel() throws IOException {
>    if (spillFile != null) {
>       throw new IllegalStateException("Spilling file already exists.");
>    }
>    // try to find a unique file name for the spilling channel
>    int maxAttempts = 10;
>    String[] tempDirs = this.tempDirs;
>    for (int attempt = 0; attempt < maxAttempts; attempt++) {
>       int dirIndex = rnd.nextInt(tempDirs.length);
>       String directory = tempDirs[dirIndex];
>       spillFile = new File(directory, randomString(rnd) + ".inputchannel");
>       try {
>          if (spillFile.createNewFile()) {
>             return new RandomAccessFile(spillFile, "rw").getChannel();
>          }
>       } catch (IOException e) {
>          // if there is no tempDir left to try
>          if(tempDirs.length <= 1) {
>             throw e;
>          }
>          LOG.warn("Caught an IOException when creating spill file: " + 
> directory + ". Attempt " + attempt, e);
>          tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex);
>       }
>    }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to