[ 
https://issues.apache.org/jira/browse/FLINK-17253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaganti spurthi updated FLINK-17253:
-------------------------------------
    Description: 
FLINK-14170 introduced Hadoop version check to support older hadoop versions. 
However the check only included "hdfs" scheme but not "viewfs". We are using 
StreamingFileSink to write data to our federated hadoop cluster with cdh-2.6 
hadoop version and we are hit with
{code:java}
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61)
 at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
 at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)
{code}
The change is remove version check when the scheme is viewfs

  was:
FLINK-14170 introduced Hadoop version check to support older hadoop versions. 
However the check only included "hdfs" scheme but not "viewfs". We are using 
StreamingFileSink to write data to our federated hadoop cluster with cdh-2.6 
hadoop version and we are hit with
{code:java}
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61)
 at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
 at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)
{code}
The change is add viewfs to the scheme check. 


> Support writing to viewfs for hadoop versions < 2.7 when using 
> BulkFormatBuilder in StreamingFileSink
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17253
>                 URL: https://issues.apache.org/jira/browse/FLINK-17253
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.8.3, 1.9.0
>            Reporter: chaganti spurthi
>            Priority: Major
>              Labels: pull-request-available
>
> FLINK-14170 introduced Hadoop version check to support older hadoop versions. 
> However the check only included "hdfs" scheme but not "viewfs". We are using 
> StreamingFileSink to write data to our federated hadoop cluster with cdh-2.6 
> hadoop version and we are hit with
> {code:java}
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are 
> only supported for HDFS and for Hadoop version 2.7 or newer at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61)
>  at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>  at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>  at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>  at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
>  at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>  at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>  at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)
> {code}
> The change is remove version check when the scheme is viewfs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to