>From the issue report, it looks like it is happening for both with and
without runner V2:
https://github.com/apache/beam/issues/28219#issuecomment-2423624052

On Fri, Oct 25, 2024 at 12:21 PM Reuven Lax via dev <dev@beam.apache.org>
wrote:

> Are you using runner v2? The issue you linked to implies that this only
> happened on runner v2.
>
> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> Hi Pranjal,
>>
>> If there is a bug in Beam, this is a good list to contact. If there is a
>> problem with a GCP service, then GCP support is better.
>>
>> I see the code you shared, but what error or difficulty are you
>> encountering?
>>
>> Kenn
>>
>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote:
>>
>>> Hi Kenneth / Yi Hun,
>>>
>>> I have been grappling since past few days to solve for a pipeline which
>>> runs as batch pipeline and loads data from GCS to BigQuery,
>>>
>>>
>>> While striving to find solution I found out similar issue and have
>>> posted what I am observing her in github issues link (
>>> https://github.com/apache/beam/issues/28219),
>>>
>>> We are using GCP Dataflow runner  as such I have written to GCP
>>> support but not much luck there,
>>>
>>> I will try to briefly explain my problem I am facing here again ,
>>>
>>> Really appreciate any leads on how I can resolve this issue,
>>>
>>>
>>> Trying to use dynamic destinations to load data from GCS to Bigquery to
>>> multiple tables / multiple schemas ,
>>>
>>>
>>> Tried to put down all required classes I am using here,
>>>
>>>
>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>>>         .apply("Convert To BigQueryRow", ParDo.of(new 
>>> Utility.ConvertToBigQueryRow(ekaUUID)));
>>>
>>> // Group by TableId to manage different schemas per table type
>>>
>>> PCollectionView<Map<String, String>> schemaView = 
>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>>>
>>> WriteResult result = kvRows.apply("WriteToBigQuery",
>>>         BigQueryIO.<KV<String, TableRow>>write()
>>>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
>>> schemaView))
>>>                 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>                 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV != 
>>> null ? stringTableRowKV.getValue() : null)
>>> );
>>>
>>>
>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, 
>>> KV<String, TableRow>> {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>>>
>>>     String uuidKeyName;
>>>
>>>     public ConvertToBigQueryRow(String uuidKeyName){
>>>         this.uuidKeyName = uuidKeyName;
>>>     }
>>>
>>>     @ProcessElement
>>>     public void processElement(ProcessContext c) throws 
>>> JsonProcessingException {
>>>         ObjectMapper mapper = new ObjectMapper();
>>>         String value = Objects.requireNonNull(c.element().getValue());
>>>         String topic = Objects.requireNonNull(c.element().getKey());
>>>         // System.out.println("Data in ProcessElem   " + value);
>>>         // Parse the entire value as a JSON tree
>>>         JsonNode rootNode = mapper.readTree(value);
>>>
>>>         // Extract the "after" field as a JSON node (not as a string)
>>>         JsonNode afterNode = rootNode.get("after");
>>>
>>>         // System.out.println("Data in ProcessElem  after  " + 
>>> afterNode.asText());
>>>         // Check if "after" field exists and is not null
>>>         if (!afterNode.isNull()) {
>>>
>>>             String afterJsonString = afterNode.asText(); // Extract the 
>>> string
>>>             // representation for json node for after
>>>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); // 
>>> Parse the string into a JsonNode
>>>
>>>             Map<String, Object> afterMap = new HashMap<>();
>>>             try {
>>>                 afterMap = mapper.convertValue(afterNode, new 
>>> TypeReference<Map<String, Object>>() {
>>>                 });
>>>             } catch (IllegalArgumentException e) {
>>>                 afterMap = mapper.convertValue(afterJsonNode, new 
>>> TypeReference<Map<String, Object>>() {
>>>                 });
>>>             }
>>>
>>>             if (afterMap != null) {
>>>                 // System.out.println("Data in ProcessElem  afterMap  " + 
>>> afterMap);
>>>                 TableRow row = new TableRow();
>>>                 for (Map.Entry<String, Object> entry : afterMap.entrySet()) 
>>> {
>>>                     row.set(entry.getKey(), entry.getValue());
>>>                 }
>>>
>>>                 // Insert eka UUID to our table
>>>                 UUID uuid = UUID.randomUUID();
>>>                 row.set(this.uuidKeyName, uuid.toString());
>>>
>>>
>>>                 LOG.info("T: {}D: {}", topic, row.toString());
>>>                 c.output(KV.of(topic, row));
>>>             }else{
>>>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>>>             }
>>>         } else {
>>>             LOG.error("The 'after' field is null.");
>>>         }
>>>     }
>>> }
>>>
>>>
>>>
>>> public static class GetSchemaViewFromBigQueryRows {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>>>     private static final Map<String, TableSchema> schemaCache = new 
>>> ConcurrentHashMap<>();
>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>
>>>     private static String schemaToJson(TableSchema schema) {
>>>         try {
>>>             return OBJECT_MAPPER.writeValueAsString(schema);
>>>         } catch (JsonProcessingException e) {
>>>             throw new RuntimeException("Error converting schema to JSON", 
>>> e);
>>>         }
>>>     }
>>>
>>>     // Method to generate a schema for the given type
>>>     public static String getSchemaForType(String type, TableRow tableRow) {
>>>
>>>         TableSchema schema = schemaCache.get(type);
>>>
>>>         if (schema == null) {
>>>             schema = new TableSchema().setFields(new ArrayList<>());
>>>             schemaCache.put(type, schema);
>>>         }
>>>
>>>         // old fields
>>>         // Create a set of existing field names to avoid duplicates
>>>         Set<String> existingFieldNames = new HashSet<>();
>>>         for (TableFieldSchema field : schema.getFields()) {
>>>             existingFieldNames.add(field.getName());
>>>         }
>>>
>>>         // set to new fields
>>>         TableSchema finalSchema = schema;
>>>         tableRow.forEach((fieldName, fieldValue) -> {
>>>             if (!existingFieldNames.contains(fieldName)) {
>>>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>>>                         .setName(fieldName)
>>>                         .setType(determineFieldType(fieldValue));
>>>                 finalSchema.getFields().add(fieldSchema);
>>>             }
>>>         });
>>>
>>>         // TODO: Add cases for handling schema conflicts also here
>>>
>>>         // update final schema in map
>>>         schemaCache.put(type, finalSchema);
>>>         return schemaToJson(finalSchema);
>>>     }
>>>
>>>     // Method to create a view of schemas as a PCollectionView<Map<String, 
>>> TableSchema>>
>>>     public static PCollectionView<Map<String, String>> createSchemasView(
>>>             PCollection<KV<String, TableRow>> input) {
>>>
>>>         // Map InputData to KV<String, TableSchema>
>>>         PCollection<KV<String, String>> schemas = input.apply("MapElements 
>>> for converting KV",
>>>                 
>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
>>> TypeDescriptors.strings()))
>>>                         .via(data -> {
>>>                             assert data != null;
>>>                             return KV.of(data.getKey(), 
>>> getSchemaForType(data.getKey(), data.getValue()));
>>>                         })
>>>         );
>>>
>>>         // Deduplicate by key (topic) using Combine.perKey() and select any 
>>> one schema for each key
>>>         PCollection<KV<String, String>> uniqueSchemas = schemas
>>>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
>>> schema2) -> schema1));  // Keep one schema per topic
>>>
>>>         // Use View.asMap() to create a side input for schemas
>>>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>>>     }
>>> }
>>>
>>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
>>> TableRow>, DestinationInfo> {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>>
>>>     private final String datasetName;
>>>     private final PCollectionView<Map<String, String>> schemaView;
>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>
>>>     public BatchDynamicDestination(String datasetName, 
>>> PCollectionView<Map<String, String>> schemaView) {
>>>         this.datasetName = datasetName;
>>>         this.schemaView = schemaView;
>>>     }
>>>
>>>     @Override
>>>     public Coder<DestinationInfo> getDestinationCoder() {
>>>         // Using SerializableCoder because DestinationInfo implements 
>>> Serializable
>>>         return DestinationInfoCoder.of();
>>>     }
>>>
>>>     @Override
>>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>>         // Register the schemaView as a side input
>>>         return Collections.singletonList(schemaView);
>>>     }
>>>
>>>
>>>     @Override
>>>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
>>> TableRow>> element) {
>>>
>>>
>>>         Map<String, String> schemas = sideInput(schemaView);
>>>         String topic = element.getValue().getKey();
>>>
>>>         String tableSchema = schemas.get(topic);
>>>         if (tableSchema == null) {
>>>             throw new RuntimeException("Schema not found for topic: " + 
>>> topic);
>>>         }
>>>
>>>         TableSchema tableSchemaObj;
>>>         try {
>>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>>> TableSchema.class);  // Deserialize JSON to TableSchema
>>>         } catch (IOException e) {
>>>             throw new RuntimeException("Error parsing schema JSON", e);
>>>         }
>>>
>>>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , 
>>> datasetName, topic, tableSchema);
>>>
>>>         return new DestinationInfo("livewel-prod", datasetName, topic, 
>>> tableSchemaObj);
>>>     }
>>>
>>>     @Override
>>>     public TableDestination getTable(DestinationInfo destination) {
>>>         return new TableDestination(destination.toTableSpec(), "Dynamically 
>>> generated table");
>>>     }
>>>
>>>     @Override
>>>     public TableSchema getSchema(DestinationInfo destination) {
>>>         return destination.getTableSchema();
>>>     }
>>>
>>>
>>> }
>>>
>>>

Reply via email to