Are you using runner v2? The issue you linked to implies that this only
happened on runner v2.

On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote:

> Hi Pranjal,
>
> If there is a bug in Beam, this is a good list to contact. If there is a
> problem with a GCP service, then GCP support is better.
>
> I see the code you shared, but what error or difficulty are you
> encountering?
>
> Kenn
>
> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote:
>
>> Hi Kenneth / Yi Hun,
>>
>> I have been grappling since past few days to solve for a pipeline which
>> runs as batch pipeline and loads data from GCS to BigQuery,
>>
>>
>> While striving to find solution I found out similar issue and have posted
>> what I am observing her in github issues link (
>> https://github.com/apache/beam/issues/28219),
>>
>> We are using GCP Dataflow runner  as such I have written to GCP
>> support but not much luck there,
>>
>> I will try to briefly explain my problem I am facing here again ,
>>
>> Really appreciate any leads on how I can resolve this issue,
>>
>>
>> Trying to use dynamic destinations to load data from GCS to Bigquery to
>> multiple tables / multiple schemas ,
>>
>>
>> Tried to put down all required classes I am using here,
>>
>>
>> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>>         .apply("Convert To BigQueryRow", ParDo.of(new 
>> Utility.ConvertToBigQueryRow(ekaUUID)));
>>
>> // Group by TableId to manage different schemas per table type
>>
>> PCollectionView<Map<String, String>> schemaView = 
>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>>
>> WriteResult result = kvRows.apply("WriteToBigQuery",
>>         BigQueryIO.<KV<String, TableRow>>write()
>>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
>> schemaView))
>>                 
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>                 
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV != 
>> null ? stringTableRowKV.getValue() : null)
>> );
>>
>>
>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, 
>> KV<String, TableRow>> {
>>
>>     private static final Logger LOG = 
>> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>>
>>     String uuidKeyName;
>>
>>     public ConvertToBigQueryRow(String uuidKeyName){
>>         this.uuidKeyName = uuidKeyName;
>>     }
>>
>>     @ProcessElement
>>     public void processElement(ProcessContext c) throws 
>> JsonProcessingException {
>>         ObjectMapper mapper = new ObjectMapper();
>>         String value = Objects.requireNonNull(c.element().getValue());
>>         String topic = Objects.requireNonNull(c.element().getKey());
>>         // System.out.println("Data in ProcessElem   " + value);
>>         // Parse the entire value as a JSON tree
>>         JsonNode rootNode = mapper.readTree(value);
>>
>>         // Extract the "after" field as a JSON node (not as a string)
>>         JsonNode afterNode = rootNode.get("after");
>>
>>         // System.out.println("Data in ProcessElem  after  " + 
>> afterNode.asText());
>>         // Check if "after" field exists and is not null
>>         if (!afterNode.isNull()) {
>>
>>             String afterJsonString = afterNode.asText(); // Extract the 
>> string
>>             // representation for json node for after
>>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); // 
>> Parse the string into a JsonNode
>>
>>             Map<String, Object> afterMap = new HashMap<>();
>>             try {
>>                 afterMap = mapper.convertValue(afterNode, new 
>> TypeReference<Map<String, Object>>() {
>>                 });
>>             } catch (IllegalArgumentException e) {
>>                 afterMap = mapper.convertValue(afterJsonNode, new 
>> TypeReference<Map<String, Object>>() {
>>                 });
>>             }
>>
>>             if (afterMap != null) {
>>                 // System.out.println("Data in ProcessElem  afterMap  " + 
>> afterMap);
>>                 TableRow row = new TableRow();
>>                 for (Map.Entry<String, Object> entry : afterMap.entrySet()) {
>>                     row.set(entry.getKey(), entry.getValue());
>>                 }
>>
>>                 // Insert eka UUID to our table
>>                 UUID uuid = UUID.randomUUID();
>>                 row.set(this.uuidKeyName, uuid.toString());
>>
>>
>>                 LOG.info("T: {}D: {}", topic, row.toString());
>>                 c.output(KV.of(topic, row));
>>             }else{
>>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>>             }
>>         } else {
>>             LOG.error("The 'after' field is null.");
>>         }
>>     }
>> }
>>
>>
>>
>> public static class GetSchemaViewFromBigQueryRows {
>>
>>     private static final Logger LOG = 
>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>>     private static final Map<String, TableSchema> schemaCache = new 
>> ConcurrentHashMap<>();
>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>
>>     private static String schemaToJson(TableSchema schema) {
>>         try {
>>             return OBJECT_MAPPER.writeValueAsString(schema);
>>         } catch (JsonProcessingException e) {
>>             throw new RuntimeException("Error converting schema to JSON", e);
>>         }
>>     }
>>
>>     // Method to generate a schema for the given type
>>     public static String getSchemaForType(String type, TableRow tableRow) {
>>
>>         TableSchema schema = schemaCache.get(type);
>>
>>         if (schema == null) {
>>             schema = new TableSchema().setFields(new ArrayList<>());
>>             schemaCache.put(type, schema);
>>         }
>>
>>         // old fields
>>         // Create a set of existing field names to avoid duplicates
>>         Set<String> existingFieldNames = new HashSet<>();
>>         for (TableFieldSchema field : schema.getFields()) {
>>             existingFieldNames.add(field.getName());
>>         }
>>
>>         // set to new fields
>>         TableSchema finalSchema = schema;
>>         tableRow.forEach((fieldName, fieldValue) -> {
>>             if (!existingFieldNames.contains(fieldName)) {
>>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>>                         .setName(fieldName)
>>                         .setType(determineFieldType(fieldValue));
>>                 finalSchema.getFields().add(fieldSchema);
>>             }
>>         });
>>
>>         // TODO: Add cases for handling schema conflicts also here
>>
>>         // update final schema in map
>>         schemaCache.put(type, finalSchema);
>>         return schemaToJson(finalSchema);
>>     }
>>
>>     // Method to create a view of schemas as a PCollectionView<Map<String, 
>> TableSchema>>
>>     public static PCollectionView<Map<String, String>> createSchemasView(
>>             PCollection<KV<String, TableRow>> input) {
>>
>>         // Map InputData to KV<String, TableSchema>
>>         PCollection<KV<String, String>> schemas = input.apply("MapElements 
>> for converting KV",
>>                 
>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
>> TypeDescriptors.strings()))
>>                         .via(data -> {
>>                             assert data != null;
>>                             return KV.of(data.getKey(), 
>> getSchemaForType(data.getKey(), data.getValue()));
>>                         })
>>         );
>>
>>         // Deduplicate by key (topic) using Combine.perKey() and select any 
>> one schema for each key
>>         PCollection<KV<String, String>> uniqueSchemas = schemas
>>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
>> schema2) -> schema1));  // Keep one schema per topic
>>
>>         // Use View.asMap() to create a side input for schemas
>>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>>     }
>> }
>>
>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
>> TableRow>, DestinationInfo> {
>>
>>     private static final Logger LOG = 
>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>
>>     private final String datasetName;
>>     private final PCollectionView<Map<String, String>> schemaView;
>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>
>>     public BatchDynamicDestination(String datasetName, 
>> PCollectionView<Map<String, String>> schemaView) {
>>         this.datasetName = datasetName;
>>         this.schemaView = schemaView;
>>     }
>>
>>     @Override
>>     public Coder<DestinationInfo> getDestinationCoder() {
>>         // Using SerializableCoder because DestinationInfo implements 
>> Serializable
>>         return DestinationInfoCoder.of();
>>     }
>>
>>     @Override
>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>         // Register the schemaView as a side input
>>         return Collections.singletonList(schemaView);
>>     }
>>
>>
>>     @Override
>>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
>> TableRow>> element) {
>>
>>
>>         Map<String, String> schemas = sideInput(schemaView);
>>         String topic = element.getValue().getKey();
>>
>>         String tableSchema = schemas.get(topic);
>>         if (tableSchema == null) {
>>             throw new RuntimeException("Schema not found for topic: " + 
>> topic);
>>         }
>>
>>         TableSchema tableSchemaObj;
>>         try {
>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>> TableSchema.class);  // Deserialize JSON to TableSchema
>>         } catch (IOException e) {
>>             throw new RuntimeException("Error parsing schema JSON", e);
>>         }
>>
>>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , 
>> datasetName, topic, tableSchema);
>>
>>         return new DestinationInfo("livewel-prod", datasetName, topic, 
>> tableSchemaObj);
>>     }
>>
>>     @Override
>>     public TableDestination getTable(DestinationInfo destination) {
>>         return new TableDestination(destination.toTableSpec(), "Dynamically 
>> generated table");
>>     }
>>
>>     @Override
>>     public TableSchema getSchema(DestinationInfo destination) {
>>         return destination.getTableSchema();
>>     }
>>
>>
>> }
>>
>>

Reply via email to