Hi Zack,

> I want to customize this job to "explode" the map as column names and
values

You can do this in a select statement extracting manually the map values
using the map access built-in
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#collection-functions>,
e.g.:

SELECT mymap['a'] AS a, mymap['b'] AS b

> specifically the BucketAssigner and the CheckpointRollingPolicy both
appear to be required to have a bucketId of a String.

I wonder if what you're looking for is the PARTITIONED BY feature:

CREATE TABLE MySinkTable (
  ...) PARTITIONED BY (partitionKey1, partitionKey2)

Does this solves your use case?

FG


On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel <zloe...@gmail.com> wrote:

> Hey all,
>
> I have a job which writes data that is a similar shape to a location in
> s3. Currently it writes a map of data with each row. I want to customize
> this job to "explode" the map as column names and values, these are
> consistent for a single bucket. Is there any way to do this? Provide a
> custom parquet schema per bucket within a single dynamic sink?
>
> I've started looking at the changes within the main codebase to make this
> feasible. It seems straightforward to provide the bucketId to the
> writerFactory, and the bucketId could be a type containing the relevant
> schema information.
> Although it appears that the BulkFormatBuilder has several spots where
> BucketId appears to be required to be a String: specifically
> the BucketAssigner and the CheckpointRollingPolicy both appear to be
> required to have a bucketId of a String.
>
> I'm curious if this is a change the community would be open to, and or if
> there is another way to accomplish what I'm looking for that I've missed.
>
> Thanks,
> Zack
>
>

Reply via email to