> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java, > > line 142 > > <https://reviews.apache.org/r/51142/diff/5/?file=1493801#file1493801line142> > > > > Isn't it clearer to have one loop like below instead of two embedded > > loops: > > while (!isShutdown) { > > if (!reader.hasNext()) { > > break; > > } > > IncomingMessageEnvelope messageEnvelope = reader.readNext(); > > try { > > super.put() > > ... > > } catch () { > > ... > > } > > }
No. In your case, if the super.put() fails, your code will skip the current event and read the next one. Unless you throw a runtime exception in the catch block to completely stop the consumption. - Hai ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51142/#review148612 ----------------------------------------------------------- On Sept. 9, 2016, 1:34 a.m., Hai Lu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51142/ > ----------------------------------------------------------- > > (Updated Sept. 9, 2016, 1:34 a.m.) > > > Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and > Navina Ramesh. > > > Bugs: SAMZA-967 > https://issues.apache.org/jira/browse/SAMZA-967 > > > Repository: samza > > > Description > ------- > > Add HDFS System Consumer: > > 1. System admin, partitioner > 2. System consumer with metrics > > Design doc can be found here: > https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf > > An overview of the high level architecture: > > The system factory is used by Samza to instantiate SystemConsumer, > SystemProducer, and SystemAdmin for a specific system. The > FileDataSystemFactory can be reused for other file system like sources. > > HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of > HDFS files need to be consumed for this job. The DirectoryPartitioner also > uses “GroupingPattern” to group files into partitions if advanced > partitioning is required. HDFSSystemAdmin will then persist the > “PartitionDescriptor” to HDFS. > > The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. > Based on this information as well as the actual assignment of partitions, it > would then know which files to read from. > > The initial implementation of the HDFS system consumer supports only avro > data files. It’s very easy to extend it to a variety of file format by > implementing the FileReader interface. > > > > > +------------------------------------------------------------------------------+ > > | > | > +-----------------+ HDFS > | > | Obtain | > | > | Partition > +------+----------------------^------+---------------------------------^-------+ > > | Description | | | > | > | | | | > | > | +-------------v-------+ | | > Filtering/ | > | | | | +---+ > Grouping +-----+ > | | HDFSAvroFileReader | | | > | > | | | Persist | | > | > | +---------+-----------+ Partition | | > | > | | Description | > +------v--------------+ +----------+----------+ > | | | | > | | | > | +---------+-----------+ | |Directory > Partitioner| | HDFSAvroWriter | > | | IFileReader | | | > | | | > | | | | > +------+--------------+ +----------+----------+ > | +---------+-----------+ | | > | > | | | | > | > | | | | > | > | +---------+-----------+ > +-+----------+--------+ +----------+----------+ > | | | | > | | | > | | HDFSSystemConsumer | | HDFSSystemAdmin > | | HDFSSystemProducer | > +----------> | | > | | | > +---------+-----------+ > +-----------+---------+ +----------+----------+ > | | > | > > +------------------------------------+------------------------------------+ > > | > > > +---------------------------------------+--------------------------------------+ > > | > | > | HDFSSystemFactory > | > | > | > > +------------------------------------------------------------------------------+ > > > Diffs > ----- > > build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 > gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala > 61b7570afae3219b618c8830905035063941bdd7 > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala > 92eb4472533db67dca01f075cb460581b4bdac0d > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala > ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION > samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION > > samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala > 261310d03de204718621f601117f016da14841df > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala > 4e328a5f8c2b496a71e36c106339b7af263c96c7 > > Diff: https://reviews.apache.org/r/51142/diff/ > > > Testing > ------- > > unit tests pass. > > manually tested by writing a real hdfs samza job and deploying to a yarn > cluster. > > > Thanks, > > Hai Lu > >