Hi,
I am newbie to apache beam
I am trying to write a simple pipeline using apache beam java sdk.
the pipleline will read a bunch of tgz files.
each tgz files have multiple CSV files with data
public static final void main(String args[]) throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
PCollection<MatchResult.Metadata> matches =
pipeline.apply(FileIO.match().filepattern("/tmp/beam5/*.tgz"));
PCollection<FileIO.ReadableFile> compGz =
matches.apply(FileIO.readMatches().withCompression(Compression.GZIP));
PCollection<String> contents = compGz.apply(FlatMapElements
// uses imports from TypeDescriptors
.into(TypeDescriptors.strings())
.via((ReadableFile f) -> {
try {
return
Arrays.asList(f.readFullyAsUTF8String().replaceAll("^@","").split("\\r?\\n|\\r",
-1));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}) );
PDone ret =
contents.apply(TextIO.write().to("/tmp/beam6/output.txt").withoutSharding());
}
instead of returning flat list of strings, i tried parsing the
f.readFullyAsUTF8String() and make CSVFileBean, but it does not seem to like
basically the above program is crude
i am looking for suggestions on right transform to transform this tgz into
individual CSV bean POJO's that have name of CSV and contents
i am stuck decoding the tgz from readFullyAsUTF8String()
eventually i need to take each CSV bean and combine them
Eg. test1.tgz has foo_time1.csv, bar_time1.csv and
test2.tgz has foo_time2.csv, bar_time2.csv
so i need to extract these CSV's and combine all the foo's and bar's
and possibly manipulate foo's, bar's by adding columns and transforming and
then sending out to destinations which can be filesystem or kafka
thanks
Any help is appreciated
Sri
________________________________