Hi Pranjal,

If there is a bug in Beam, this is a good list to contact. If there is a
problem with a GCP service, then GCP support is better.

I see the code you shared, but what error or difficulty are you
encountering?

Kenn

On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote:

> Hi Kenneth / Yi Hun,
>
> I have been grappling since past few days to solve for a pipeline which
> runs as batch pipeline and loads data from GCS to BigQuery,
>
>
> While striving to find solution I found out similar issue and have posted
> what I am observing her in github issues link (
> https://github.com/apache/beam/issues/28219),
>
> We are using GCP Dataflow runner  as such I have written to GCP
> support but not much luck there,
>
> I will try to briefly explain my problem I am facing here again ,
>
> Really appreciate any leads on how I can resolve this issue,
>
>
> Trying to use dynamic destinations to load data from GCS to Bigquery to
> multiple tables / multiple schemas ,
>
>
> Tried to put down all required classes I am using here,
>
>
> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>         .apply("Convert To BigQueryRow", ParDo.of(new 
> Utility.ConvertToBigQueryRow(ekaUUID)));
>
> // Group by TableId to manage different schemas per table type
>
> PCollectionView<Map<String, String>> schemaView = 
> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>
> WriteResult result = kvRows.apply("WriteToBigQuery",
>         BigQueryIO.<KV<String, TableRow>>write()
>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
> schemaView))
>                 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>                 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV != 
> null ? stringTableRowKV.getValue() : null)
> );
>
>
> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, 
> KV<String, TableRow>> {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>
>     String uuidKeyName;
>
>     public ConvertToBigQueryRow(String uuidKeyName){
>         this.uuidKeyName = uuidKeyName;
>     }
>
>     @ProcessElement
>     public void processElement(ProcessContext c) throws 
> JsonProcessingException {
>         ObjectMapper mapper = new ObjectMapper();
>         String value = Objects.requireNonNull(c.element().getValue());
>         String topic = Objects.requireNonNull(c.element().getKey());
>         // System.out.println("Data in ProcessElem   " + value);
>         // Parse the entire value as a JSON tree
>         JsonNode rootNode = mapper.readTree(value);
>
>         // Extract the "after" field as a JSON node (not as a string)
>         JsonNode afterNode = rootNode.get("after");
>
>         // System.out.println("Data in ProcessElem  after  " + 
> afterNode.asText());
>         // Check if "after" field exists and is not null
>         if (!afterNode.isNull()) {
>
>             String afterJsonString = afterNode.asText(); // Extract the string
>             // representation for json node for after
>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); // 
> Parse the string into a JsonNode
>
>             Map<String, Object> afterMap = new HashMap<>();
>             try {
>                 afterMap = mapper.convertValue(afterNode, new 
> TypeReference<Map<String, Object>>() {
>                 });
>             } catch (IllegalArgumentException e) {
>                 afterMap = mapper.convertValue(afterJsonNode, new 
> TypeReference<Map<String, Object>>() {
>                 });
>             }
>
>             if (afterMap != null) {
>                 // System.out.println("Data in ProcessElem  afterMap  " + 
> afterMap);
>                 TableRow row = new TableRow();
>                 for (Map.Entry<String, Object> entry : afterMap.entrySet()) {
>                     row.set(entry.getKey(), entry.getValue());
>                 }
>
>                 // Insert eka UUID to our table
>                 UUID uuid = UUID.randomUUID();
>                 row.set(this.uuidKeyName, uuid.toString());
>
>
>                 LOG.info("T: {}D: {}", topic, row.toString());
>                 c.output(KV.of(topic, row));
>             }else{
>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>             }
>         } else {
>             LOG.error("The 'after' field is null.");
>         }
>     }
> }
>
>
>
> public static class GetSchemaViewFromBigQueryRows {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>     private static final Map<String, TableSchema> schemaCache = new 
> ConcurrentHashMap<>();
>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>
>     private static String schemaToJson(TableSchema schema) {
>         try {
>             return OBJECT_MAPPER.writeValueAsString(schema);
>         } catch (JsonProcessingException e) {
>             throw new RuntimeException("Error converting schema to JSON", e);
>         }
>     }
>
>     // Method to generate a schema for the given type
>     public static String getSchemaForType(String type, TableRow tableRow) {
>
>         TableSchema schema = schemaCache.get(type);
>
>         if (schema == null) {
>             schema = new TableSchema().setFields(new ArrayList<>());
>             schemaCache.put(type, schema);
>         }
>
>         // old fields
>         // Create a set of existing field names to avoid duplicates
>         Set<String> existingFieldNames = new HashSet<>();
>         for (TableFieldSchema field : schema.getFields()) {
>             existingFieldNames.add(field.getName());
>         }
>
>         // set to new fields
>         TableSchema finalSchema = schema;
>         tableRow.forEach((fieldName, fieldValue) -> {
>             if (!existingFieldNames.contains(fieldName)) {
>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>                         .setName(fieldName)
>                         .setType(determineFieldType(fieldValue));
>                 finalSchema.getFields().add(fieldSchema);
>             }
>         });
>
>         // TODO: Add cases for handling schema conflicts also here
>
>         // update final schema in map
>         schemaCache.put(type, finalSchema);
>         return schemaToJson(finalSchema);
>     }
>
>     // Method to create a view of schemas as a PCollectionView<Map<String, 
> TableSchema>>
>     public static PCollectionView<Map<String, String>> createSchemasView(
>             PCollection<KV<String, TableRow>> input) {
>
>         // Map InputData to KV<String, TableSchema>
>         PCollection<KV<String, String>> schemas = input.apply("MapElements 
> for converting KV",
>                 
> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
> TypeDescriptors.strings()))
>                         .via(data -> {
>                             assert data != null;
>                             return KV.of(data.getKey(), 
> getSchemaForType(data.getKey(), data.getValue()));
>                         })
>         );
>
>         // Deduplicate by key (topic) using Combine.perKey() and select any 
> one schema for each key
>         PCollection<KV<String, String>> uniqueSchemas = schemas
>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
> schema2) -> schema1));  // Keep one schema per topic
>
>         // Use View.asMap() to create a side input for schemas
>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>     }
> }
>
> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
> TableRow>, DestinationInfo> {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(BatchDynamicDestination.class);
>
>     private final String datasetName;
>     private final PCollectionView<Map<String, String>> schemaView;
>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>
>     public BatchDynamicDestination(String datasetName, 
> PCollectionView<Map<String, String>> schemaView) {
>         this.datasetName = datasetName;
>         this.schemaView = schemaView;
>     }
>
>     @Override
>     public Coder<DestinationInfo> getDestinationCoder() {
>         // Using SerializableCoder because DestinationInfo implements 
> Serializable
>         return DestinationInfoCoder.of();
>     }
>
>     @Override
>     public java.util.List<PCollectionView<?>> getSideInputs() {
>         // Register the schemaView as a side input
>         return Collections.singletonList(schemaView);
>     }
>
>
>     @Override
>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
> TableRow>> element) {
>
>
>         Map<String, String> schemas = sideInput(schemaView);
>         String topic = element.getValue().getKey();
>
>         String tableSchema = schemas.get(topic);
>         if (tableSchema == null) {
>             throw new RuntimeException("Schema not found for topic: " + 
> topic);
>         }
>
>         TableSchema tableSchemaObj;
>         try {
>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
> TableSchema.class);  // Deserialize JSON to TableSchema
>         } catch (IOException e) {
>             throw new RuntimeException("Error parsing schema JSON", e);
>         }
>
>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , datasetName, 
> topic, tableSchema);
>
>         return new DestinationInfo("livewel-prod", datasetName, topic, 
> tableSchemaObj);
>     }
>
>     @Override
>     public TableDestination getTable(DestinationInfo destination) {
>         return new TableDestination(destination.toTableSpec(), "Dynamically 
> generated table");
>     }
>
>     @Override
>     public TableSchema getSchema(DestinationInfo destination) {
>         return destination.getTableSchema();
>     }
>
>
> }
>
>

Reply via email to