Hi Aljoscha,
It was my bad that I have copied some wrong class files during one of the step. 
I have retried the same steps that I mentioned earlier and with that I am able 
to see all the debug statements that I have added to the RollingSink..
I have found another interesting issue here. I am using HCFS (Hadoop Compatible 
File System) implementation of the filesystem that we have built in-house (not 
stock HDFC). As part of the recovery process in the restoreState() method of 
RollingSink class, we are trying to invoke revokeLease() API which is available 
only in DistributedFileSystem (or any inherited class) whereas the HCFS contact 
class that we have implemented is FileSystem. Since the codepath will not 
invoke revokeLease() for our HCFS implementation class, I am seeing the part 
file with empty content though the file is renamed from "in-progress" to actual 
file name.
Question: Do you know if RollingSink implementation is tested with any Hadoop 
Compatible File System like GlusterFS, etc.,?  
RegardsVijay 

    On Wednesday, March 23, 2016 7:42 AM, Aljoscha Krettek 
<aljos...@apache.org> wrote:
 

 Hmm, that’s strange. Could you maybe send one of the TaskManager logs?

Cheers,
Aljoscha
> On 23 Mar 2016, at 15:28, Vijay <vijikar...@yahoo.com.INVALID> wrote:
> 
> Yes, I have updated on all cluster nodes and restarted entire cluster. 
> 
> Do you see any problems with the steps that I followed?
> 
> Regards,
> Vijay
> 
> Sent from my iPhone
> 
>> On Mar 23, 2016, at 7:18 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>> 
>> Hi,
>> did you update the log4j.properties file on all nodes where the TaskManagers 
>> run and did you restart the whole cluster?
>> 
>> Cheers,
>> Aljoscha
>>> On 23 Mar 2016, at 15:02, Vijay <vijikar...@yahoo.com.INVALID> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> I am using standalone flink cluster (3 node). I am running flink job by 
>>> submitting/uploading jar through Flink UI.
>>> 
>>> I have built flink from maven and modified the RollingSink code to add new 
>>> debug statements.
>>> 
>>> I have also packaged the streaming file system connector package (including 
>>> RollingSink changes) to the job jar file. Modified changes include both 
>>> Sytem.out as well as logger statements.
>>> 
>>> Updated log4j property file to DEBUG
>>> 
>>> Regards,
>>> Vijay
>>> 
>>> Sent from my iPhone
>>> 
>>>> On Mar 23, 2016, at 6:48 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>> 
>>>> Hi,
>>>> what where the steps you took? By the way, are you running this on yarn or 
>>>> in standalone mode? How are you starting the Flink job? Do you still don’t 
>>>> see DEBUG entries in the log?
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 23 Mar 2016, at 14:32, Vijay <vijikar...@yahoo.com> wrote:
>>>>> 
>>>>> I have changed the properties file but it did not help.
>>>>> 
>>>>> Regards,
>>>>> Vijay
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>>> On Mar 23, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> 
>>>>>> wrote:
>>>>>> 
>>>>>> Ok, then you should be able to change the log level to DEBUG in 
>>>>>> conf/log4j.properties.
>>>>>> 
>>>>>>> On 23 Mar 2016, at 12:41, Vijay <vijikar...@yahoo.com> wrote:
>>>>>>> 
>>>>>>> I think only the ERROR category gets displayed in the log file
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Vijay
>>>>>>> 
>>>>>>> Sent from my iPhone
>>>>>>> 
>>>>>>>> On Mar 23, 2016, at 2:30 AM, Aljoscha Krettek <aljos...@apache.org> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> are you seeing the regular log output from the RollingSink in the 
>>>>>>>> TaskManager logs?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 22 Mar 2016, at 20:03, Vijay Srinivasaraghavan 
>>>>>>>>> <vijikar...@yahoo.com> wrote:
>>>>>>>>> 
>>>>>>>>> I have tried both log4j logger as well as System.out.println option 
>>>>>>>>> but none of these worked. 
>>>>>>>>> 
>>>>>>>>> From what I have seen so far is the Filesystem streaming connector 
>>>>>>>>> classes are not packaged in the grand jar 
>>>>>>>>> (flink-dist_2.10-1.1-SNAPSHOT.jar) that is copied under 
>>>>>>>>> <FLINK_HOME>/build-target/lib location as part of Flink maven build 
>>>>>>>>> step.
>>>>>>>>> 
>>>>>>>>> So, I manually copied (overwrite) the compiled class files from 
>>>>>>>>> org.apache.flink.streaming.connectors.fs package to the my "Flink 
>>>>>>>>> job" distribution jar (otherwise it was using standard jars that are 
>>>>>>>>> defined as mvn dependency in Articatory) and then uploaded the jar to 
>>>>>>>>> Job Manager.
>>>>>>>>> 
>>>>>>>>> Am I missing something? How do I enable logging for the RollingSink 
>>>>>>>>> class?
>>>>>>>>> 
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-connector-filesystem_2.11</artifactId>
>>>>>>>>> <version>${flink.version}</version>
>>>>>>>>> <scope>provided</scope>
>>>>>>>>> </dependency>
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tuesday, March 22, 2016 3:04 AM, Aljoscha Krettek 
>>>>>>>>> <aljos...@apache.org> wrote:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> how are you printing the debug statements?
>>>>>>>>> 
>>>>>>>>> But yeah all the logic of renaming in progress files and cleaning up 
>>>>>>>>> after a failed job happens in restoreState(BucketState state). The 
>>>>>>>>> steps are roughly these:
>>>>>>>>> 
>>>>>>>>> 1. Move current in-progress file to final location
>>>>>>>>> 2. truncate the file if necessary (if truncate is not available write 
>>>>>>>>> a .valid-length file)
>>>>>>>>> 3. Move pending files to final location that where part of the 
>>>>>>>>> checkpoint
>>>>>>>>> 4. cleanup any leftover pending/in-progress files
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>> 
>>>>>>>>>> On 22 Mar 2016, at 10:08, Vijay Srinivasaraghavan 
>>>>>>>>>> <vijikar...@yahoo.com.INVALID> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hello,
>>>>>>>>>> I have enabled checkpoint and I am using RollingSink to sink the 
>>>>>>>>>> data to HDFS (2.7.x) from KafkaConsumer. To simulate 
>>>>>>>>>> failover/recovery, I stopped TaskManager and the job gets 
>>>>>>>>>> rescheduled to other Taskmanager instance. During this momemnt, the 
>>>>>>>>>> current "in-progress" gets closed and renamed to part-0-1 from 
>>>>>>>>>> _part-0-1_in-progress. 
>>>>>>>>>> I was hoping to see the debug statement that I have added to 
>>>>>>>>>> "restoreState" method but none of my debug statement gets printed. I 
>>>>>>>>>> am not sure if the restoreState() method gets invoked during this 
>>>>>>>>>> scenario. Could you please help me understand the flow during 
>>>>>>>>>> "failover" scenario?
>>>>>>>>>> P.S: Functionally the code appears to be working fine but I am 
>>>>>>>>>> trying to understand the underlying implementation details. public 
>>>>>>>>>> void restoreState(BucketState state)
>>>>>>>>>> Regards
>>>>>>>>>> Vijay
>> 


  

Reply via email to