Thanks Kobe let me give it a try!

From: Kobe Feng <flllbls...@gmail.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Cc: Yuchu Cao <yuc...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to 
different output paths?

I used the following way long time ago for writing into partitions in hdfs 
(maybe better solutions from others), and not sure any interface change which 
you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, 
FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => 
relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + 
partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for 
partitioning when event time window, or process time.

and partitionFunc could consider multi partition columns to get subdirectories 
base on ur file system path separator, e.g. S3.

On Wed, Mar 3, 2021 at 5:36 PM Tao Li <t...@zillow.com<mailto:t...@zillow.com>> 
wrote:
Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with 
this hour. With Flink (for example), we can leverage “Bucketing File Sink”: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fconnectors%2Ffilesystem_sink.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030924936%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UdztxrPHWE%2B94FslOWpJQpovdB8XJJk7sNYcY6KPP3U%3D&reserved=0>

However I am not seeing Beam FileIO’s writeDynamic API supports specifying 
different output paths for different groups: 
https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.28.0%2Findex.html%3Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileIO.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030934892%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=lZuUQJAvuSxgCUNP%2BckbHQLqNq8u%2FcGMAXFSA2KOqW0%3D&reserved=0>

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam 
writeDynamic? Please advise. Thanks!




--
Yours Sincerely
Kobe Feng

Reply via email to