Hi Jagadish,
I resolved the problem. Samza must close the FileSystem somewhere when
“shutdown” is triggered. Now I don’t get FileSystem instance from cache and
everything works fine.
Thanks!
————————
Qi Shu
> 在 2017年8月24日,15:06,舒琦 <[email protected]> 写道:
>
> Yes, in the same thread.
>
> File list is not empty, because after samza job shutdown, all files in
> writing are not in complete state.
>
> ————————
> Qi Shu
>
>> 在 2017年8月24日,15:00,Jagadish Venkatraman <[email protected]> 写道:
>>
>>>> Now the problem is not program hang, but the container quit
>> before the “closeFiles” method executed completely.
>>
>> It's unlikely that the container quit without returning from *close*. Are
>> you sure there were filesList is not empty? I'd suggest adding more logging.
>>
>> I'm assuming you are calling *closeFiles* in the same thread?
>>
>> On Wed, Aug 23, 2017 at 11:53 PM, 舒琦 <[email protected]> wrote:
>>
>>> Hi Jagadish,
>>>
>>> When samza job is shutdown, “Begin to stop” is printed out in the
>>> log file, but not the log in closeFiles.
>>> Now the problem is not program hang, but the container quit before
>>> the “closeFiles” method executed completely.
>>>
>>> Thanks.
>>>
>>> ————————
>>> Qi Shu
>>>
>>>> 在 2017年8月24日,14:44,Jagadish Venkatraman <[email protected]> 写道:
>>>>
>>>> Is "Begin to close file" printed? Where exactly is your application
>>> stalled
>>>> ? I'd suggest you take a stack dump.
>>>>
>>>> On Wed, Aug 23, 2017 at 11:32 PM, 舒琦 <[email protected]> wrote:
>>>>
>>>>> Hi Jagadish,
>>>>>
>>>>> Below is part of the related code, log “"Begin to stop” is
>>> printed
>>>>> out.
>>>>>
>>>>> Thank you!
>>>>>
>>>>> public abstract class MyProducer implements SystemProducer {
>>>>>
>>>>> @Override
>>>>> public void stop() {
>>>>> LOGGER.info("Begin to stop");
>>>>>
>>>>> closeFiles();
>>>>>
>>>>> LOGGER.info("End to stop");
>>>>> }
>>>>>
>>>>> @Override
>>>>> protected void closeFiles() throws IOException {
>>>>> closeFiles(statusFiles);
>>>>> closeFiles(interactionFiles);
>>>>> }
>>>>>
>>>>> private void closeFiles(Map<String, FileInWriting> files) throws
>>>>> IOException {
>>>>> for (FileInWriting statusFile : files.values()) {
>>>>> LOGGER.info("Begin to close file: {}",
>>>>> statusFile.getFilePath());
>>>>> statusFile.getWriter().hflush();
>>>>> statusFile.getWriter().close();
>>>>> renameFile(statusFile.getFilePath());
>>>>> LOGGER.info("Successfully close file: {}",
>>>>> statusFile.getFilePath());
>>>>> }
>>>>>
>>>>> files.clear();
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> ————————
>>>>> Qi Shu
>>>>>
>>>>>> 在 2017年8月24日,14:21,Jagadish Venkatraman <[email protected]> 写道:
>>>>>>
>>>>>> Hi Qi,
>>>>>>
>>>>>>>> the stop method in SystemProducer is called, but the close files
>>>>>> operation(may need some time, there may be cache data to be flushed) in
>>>>>> stop method is not executed completely
>>>>>>
>>>>>> Are you seeing the *close()* method hang? SystemProducer.*close* is a
>>>>>> synchronous operation, and will block for the *close* method to finish.
>>>>>>
>>>>>> Best,
>>>>>> Jagadish
>>>>>>
>>>>>> On Wed, Aug 23, 2017 at 11:17 PM, 舒琦 <[email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I write a SystemProducer for HDFS and everything is fine. When
>>>>>>> samza job is shutdown, the stop method in SystemProducer is called,
>>> but
>>>>> the
>>>>>>> close files operation(may need some time, there may be cache data to
>>> be
>>>>>>> flushed) in stop method is not executed completely.
>>>>>>>
>>>>>>> How can I resolve this problem?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> ————————
>>>>>>> Qi Shu
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jagadish V,
>>>>>> Graduate Student,
>>>>>> Department of Computer Science,
>>>>>> Stanford University
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jagadish V,
>>>> Graduate Student,
>>>> Department of Computer Science,
>>>> Stanford University
>>>
>>>
>>
>>
>> --
>> Jagadish V,
>> Graduate Student,
>> Department of Computer Science,
>> Stanford University
>