Hi Jagadish,

        Below is part of the related code, log “"Begin to stop” is printed out.

        Thank you!

public abstract class MyProducer implements SystemProducer {

    @Override
    public void stop() {
        LOGGER.info("Begin to stop");

        closeFiles();

        LOGGER.info("End to stop");
    }

    @Override
    protected void closeFiles() throws IOException {
        closeFiles(statusFiles);
        closeFiles(interactionFiles);
    }

    private void closeFiles(Map<String, FileInWriting> files) throws 
IOException {
        for (FileInWriting statusFile : files.values()) {
            LOGGER.info("Begin to close file: {}", statusFile.getFilePath());
            statusFile.getWriter().hflush();
            statusFile.getWriter().close();
            renameFile(statusFile.getFilePath());
            LOGGER.info("Successfully close file: {}", 
statusFile.getFilePath());
        }

        files.clear();
    }

}


————————
Qi Shu

> 在 2017年8月24日,14:21,Jagadish Venkatraman <jagadish1...@gmail.com> 写道:
> 
> Hi Qi,
> 
>>> the stop method in SystemProducer is called, but the close files
> operation(may need some time, there may be cache data to be flushed) in
> stop method is not executed completely
> 
> Are you seeing the *close()* method hang? SystemProducer.*close* is a
> synchronous operation, and will block for the *close* method to finish.
> 
> Best,
> Jagadish
> 
> On Wed, Aug 23, 2017 at 11:17 PM, 舒琦 <sh...@eefung.com> wrote:
> 
>> Hi,
>> 
>>        I write a SystemProducer for HDFS and everything is fine. When
>> samza job is shutdown, the stop method in SystemProducer is called, but the
>> close files operation(may need some time, there may be cache data to be
>> flushed) in stop method is not executed completely.
>> 
>>        How can I resolve this problem?
>> 
>>        Thanks!
>> 
>> ————————
>> Qi Shu
>> 
>> 
> 
> 
> -- 
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University

Reply via email to