> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java, > > line 91 > > <https://reviews.apache.org/r/51142/diff/5/?file=1493800#file1493800line91> > > > > You can do: > > try(FSDataOutputStream fos = fs.create(targetPath)) { > > fos.write(PartitionDescriptionUtil.toJson(); > > } > > Hai Lu wrote: > Do you just intend to narrow down the try block? the "getFileSystem" > method above will also throw IOExecption, so I have to include everything > here.
I was referring to the pattern of try-with-resource in JDK7. See: https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html > On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java, > > line 105 > > <https://reviews.apache.org/r/51142/diff/5/?file=1493800#file1493800line105> > > > > What if the PartitionDescriptor already exists? Could it be the case > > that the systemStreamMetadata maintains a different copy of > > PartitionDescriptor? It is not clear to me which one is the source of > > truth? directoryPartitioner.getPartitionMetadataMap()? Or > > directoryPartitioner.getPartitionDescriptor()? Maybe I miss some basic > > information regarding to the concept on PartitionDescriptor vs > > PartitionMetadataMap? > > Hai Lu wrote: > You are right. This is an unsolved problem given that we assume the HDFS > folder is immutable. So now, what if the HDFS folder really is altered. > Before we support the mutable HFDS input, I think we have two options: 1. > Throw an runtime exception to stop the job if we see inconsistency 2. Ignore > the newer folder info we got and always treat the partition descriptors that > we persisted on HDFS as the source of truth. Use the source of truth to > proceed with the job and don't kill it or throw exception. > Do you have a preference? Actually, before we support mutable HDFS input, I think we need to do the following: - if folder info is not consistent w/ partition descriptors, validate partition descriptors to make sure all files exist (i.e. the descriptor is still valid) and then continue w/ the existing partition descriptor (assuming it is immutable) - if validation failed, throw an exception to stop the job, since the original immutable set of partition descriptor is no longer valid > On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java, > > line 69 > > <https://reviews.apache.org/r/51142/diff/5/?file=1493801#file1493801line69> > > > > I would prefer to follow the same pattern as KafkaSystemConsumer, i.e. > > passing in the HdfsSystemConsumerMetrics object instead of the > > MetricsRegistry. Please check the code in KafkaSystemFactory.getConsumer() > > to see how the metrics object are created and passed along. > > Hai Lu wrote: > I can't avoid directly using MetricsRegistry. At the very least, I have > to pass this to the base class: BlockingEnvelopeMap (unless you want to > change all the interface in BlockingEnvelopMap as well. Event if we wan t to > do that. It seems beyond the scope of this RB. I can maybe to a separte fix > for that.). But I will create a HdfsSystemConsumerMetrics. Please check code in KafkaSystemConsumer. It extends BlockingEnvelopeMap as well and use metrics.registry to get the MetricsRegistry to pass into BlockingEnvelopeMap's constructor. No need to change any interface. - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51142/#review148612 ----------------------------------------------------------- On Sept. 9, 2016, 1:34 a.m., Hai Lu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51142/ > ----------------------------------------------------------- > > (Updated Sept. 9, 2016, 1:34 a.m.) > > > Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and > Navina Ramesh. > > > Bugs: SAMZA-967 > https://issues.apache.org/jira/browse/SAMZA-967 > > > Repository: samza > > > Description > ------- > > Add HDFS System Consumer: > > 1. System admin, partitioner > 2. System consumer with metrics > > Design doc can be found here: > https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf > > An overview of the high level architecture: > > The system factory is used by Samza to instantiate SystemConsumer, > SystemProducer, and SystemAdmin for a specific system. The > FileDataSystemFactory can be reused for other file system like sources. > > HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of > HDFS files need to be consumed for this job. The DirectoryPartitioner also > uses “GroupingPattern” to group files into partitions if advanced > partitioning is required. HDFSSystemAdmin will then persist the > “PartitionDescriptor” to HDFS. > > The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. > Based on this information as well as the actual assignment of partitions, it > would then know which files to read from. > > The initial implementation of the HDFS system consumer supports only avro > data files. It’s very easy to extend it to a variety of file format by > implementing the FileReader interface. > > > > > +------------------------------------------------------------------------------+ > > | > | > +-----------------+ HDFS > | > | Obtain | > | > | Partition > +------+----------------------^------+---------------------------------^-------+ > > | Description | | | > | > | | | | > | > | +-------------v-------+ | | > Filtering/ | > | | | | +---+ > Grouping +-----+ > | | HDFSAvroFileReader | | | > | > | | | Persist | | > | > | +---------+-----------+ Partition | | > | > | | Description | > +------v--------------+ +----------+----------+ > | | | | > | | | > | +---------+-----------+ | |Directory > Partitioner| | HDFSAvroWriter | > | | IFileReader | | | > | | | > | | | | > +------+--------------+ +----------+----------+ > | +---------+-----------+ | | > | > | | | | > | > | | | | > | > | +---------+-----------+ > +-+----------+--------+ +----------+----------+ > | | | | > | | | > | | HDFSSystemConsumer | | HDFSSystemAdmin > | | HDFSSystemProducer | > +----------> | | > | | | > +---------+-----------+ > +-----------+---------+ +----------+----------+ > | | > | > > +------------------------------------+------------------------------------+ > > | > > > +---------------------------------------+--------------------------------------+ > > | > | > | HDFSSystemFactory > | > | > | > > +------------------------------------------------------------------------------+ > > > Diffs > ----- > > build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 > gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala > 61b7570afae3219b618c8830905035063941bdd7 > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala > 92eb4472533db67dca01f075cb460581b4bdac0d > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala > ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION > samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION > > samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala > 261310d03de204718621f601117f016da14841df > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala > 4e328a5f8c2b496a71e36c106339b7af263c96c7 > > Diff: https://reviews.apache.org/r/51142/diff/ > > > Testing > ------- > > unit tests pass. > > manually tested by writing a real hdfs samza job and deploying to a yarn > cluster. > > > Thanks, > > Hai Lu > >