> On Sept. 29, 2016, 10:02 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala, > > line 197 > > <https://reviews.apache.org/r/51142/diff/5-7/?file=1493810#file1493810line197> > > > > Thinking of this more, I would prefer less dependency imposed between > > samza-yarn and samza-hdfs modules. Thinking of a case where HDFS consumer > > is used by a standalone Samza job, there is no YarnConfig object in the > > job. I think we should make this as required config for HdfsSystemConsumer, > > just like ZooKeeper connnect string is required for KafkaSystemConsumer. > > > > Also, under which condition we need to clear the partition descriptor > > info in the staging dir? We need to think about the cleanup procedure as > > well. > > Hai Lu wrote: > We need to remove partition descriptors when job is done. Not doing so > would end up spamming user's HDFS space, causing immediate troubles to our > users. > > But right now there is no way that HdfsSystemConsumer/Admin would know > when the job is shutdown. So I don't see there is a solution if we don't > directly/indirectly depend on YARN, since only the YARN codes have this idea > of staging directory, and actually clean up the directory at the end of the > job. I think what we really need to do, long term, is to support staging > direcotry in the Samza level, so that in addition to YARN, other platforms > like Docker, Mesos, Standalone can work as well. > > Plus we have to keep in mind that only YARN has the kerberos support for > now. So currently HDFS systems ARE depending on YARN in that sense. Security > is one more thing to deal with (aside from staging directory) before we can > say HDFS sytems no long depends on YARN. > > What do you think? I will keep this issue open.
There are two different levels of dependencies here: a) code-level dependency that means the HdfsSystemConsumer code depends directly on samza-yarn classes; b) config/semantic dependency that means some expected behavior of a certain function (i.e. cleanup) depends on other modules. I would prefer to remove the code-level dependency from the beginning. We can still set the configuration of HdfsSystemConsumer to use the same staging directory configuration from samza-yarn to achieve the cleanup function. This means that HdfsSystemConsumer itself does not support after-completion cleanup yet and depends on samza-yarn to clean up. It is a configure-level dependency and we have the freedom to remove this w/o code change when either a) HdfsSystemConsumer can cleanup the staging directory after end-of-stream; b) staging directory config is moved to samza-core. Thoughts? - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51142/#review150949 ----------------------------------------------------------- On Oct. 3, 2016, 4:54 p.m., Hai Lu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51142/ > ----------------------------------------------------------- > > (Updated Oct. 3, 2016, 4:54 p.m.) > > > Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh. > > > Bugs: SAMZA-967 > https://issues.apache.org/jira/browse/SAMZA-967 > > > Repository: samza > > > Description > ------- > > Add HDFS System Consumer: > > 1. System admin, partitioner > 2. System consumer with metrics > > Design doc can be found here: > https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf > > An overview of the high level architecture: > > The system factory is used by Samza to instantiate SystemConsumer, > SystemProducer, and SystemAdmin for a specific system. The > FileDataSystemFactory can be reused for other file system like sources. > > HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of > HDFS files need to be consumed for this job. The DirectoryPartitioner also > uses “GroupingPattern” to group files into partitions if advanced > partitioning is required. HDFSSystemAdmin will then persist the > “PartitionDescriptor” to HDFS. > > The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. > Based on this information as well as the actual assignment of partitions, it > would then know which files to read from. > > The initial implementation of the HDFS system consumer supports only avro > data files. It’s very easy to extend it to a variety of file format by > implementing the FileReader interface. > > > > > +------------------------------------------------------------------------------+ > > | > | > +-----------------+ HDFS > | > | Obtain | > | > | Partition > +------+----------------------^------+---------------------------------^-------+ > > | Description | | | > | > | | | | > | > | +-------------v-------+ | | > Filtering/ | > | | | | +---+ > Grouping +-----+ > | | HDFSAvroFileReader | | | > | > | | | Persist | | > | > | +---------+-----------+ Partition | | > | > | | Description | > +------v--------------+ +----------+----------+ > | | | | > | | | > | +---------+-----------+ | |Directory > Partitioner| | HDFSAvroWriter | > | | IFileReader | | | > | | | > | | | | > +------+--------------+ +----------+----------+ > | +---------+-----------+ | | > | > | | | | > | > | | | | > | > | +---------+-----------+ > +-+----------+--------+ +----------+----------+ > | | | | > | | | > | | HDFSSystemConsumer | | HDFSSystemAdmin > | | HDFSSystemProducer | > +----------> | | > | | | > +---------+-----------+ > +-----------+---------+ +----------+----------+ > | | > | > > +------------------------------------+------------------------------------+ > > | > > > +---------------------------------------+--------------------------------------+ > > | > | > | HDFSSystemFactory > | > | > | > > +------------------------------------------------------------------------------+ > > > Diffs > ----- > > build.gradle 2bea27b75288d3103178bc3762b9556f6e69cdd1 > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala > 61b7570afae3219b618c8830905035063941bdd7 > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala > 92eb4472533db67dca01f075cb460581b4bdac0d > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala > ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION > samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION > > samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala > 261310d03de204718621f601117f016da14841df > samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java > dacc52de0a34498a715a299bc69c95aebd1b08ba > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala > 4e328a5f8c2b496a71e36c106339b7af263c96c7 > > Diff: https://reviews.apache.org/r/51142/diff/ > > > Testing > ------- > > unit tests pass. > > manually tested by writing a real hdfs samza job and deploying to a yarn > cluster. > > > Thanks, > > Hai Lu > >