Hi Pranjal, If there is a bug in Beam, this is a good list to contact. If there is a problem with a GCP service, then GCP support is better.
I see the code you shared, but what error or difficulty are you encountering? Kenn On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote: > Hi Kenneth / Yi Hun, > > I have been grappling since past few days to solve for a pipeline which > runs as batch pipeline and loads data from GCS to BigQuery, > > > While striving to find solution I found out similar issue and have posted > what I am observing her in github issues link ( > https://github.com/apache/beam/issues/28219), > > We are using GCP Dataflow runner as such I have written to GCP > support but not much luck there, > > I will try to briefly explain my problem I am facing here again , > > Really appreciate any leads on how I can resolve this issue, > > > Trying to use dynamic destinations to load data from GCS to Bigquery to > multiple tables / multiple schemas , > > > Tried to put down all required classes I am using here, > > > PCollection<KV<String, TableRow>> kvRows = decompressedLines > .apply("Convert To BigQueryRow", ParDo.of(new > Utility.ConvertToBigQueryRow(ekaUUID))); > > // Group by TableId to manage different schemas per table type > > PCollectionView<Map<String, String>> schemaView = > Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); > > WriteResult result = kvRows.apply("WriteToBigQuery", > BigQueryIO.<KV<String, TableRow>>write() > .to(new BatchDynamicDestination(bigqueryDatasetName, > schemaView)) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) > .withFormatFunction(stringTableRowKV -> stringTableRowKV != > null ? stringTableRowKV.getValue() : null) > ); > > > public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, > KV<String, TableRow>> { > > private static final Logger LOG = > LoggerFactory.getLogger(ConvertToBigQueryRow.class); > > String uuidKeyName; > > public ConvertToBigQueryRow(String uuidKeyName){ > this.uuidKeyName = uuidKeyName; > } > > @ProcessElement > public void processElement(ProcessContext c) throws > JsonProcessingException { > ObjectMapper mapper = new ObjectMapper(); > String value = Objects.requireNonNull(c.element().getValue()); > String topic = Objects.requireNonNull(c.element().getKey()); > // System.out.println("Data in ProcessElem " + value); > // Parse the entire value as a JSON tree > JsonNode rootNode = mapper.readTree(value); > > // Extract the "after" field as a JSON node (not as a string) > JsonNode afterNode = rootNode.get("after"); > > // System.out.println("Data in ProcessElem after " + > afterNode.asText()); > // Check if "after" field exists and is not null > if (!afterNode.isNull()) { > > String afterJsonString = afterNode.asText(); // Extract the string > // representation for json node for after > JsonNode afterJsonNode = mapper.readTree(afterJsonString); // > Parse the string into a JsonNode > > Map<String, Object> afterMap = new HashMap<>(); > try { > afterMap = mapper.convertValue(afterNode, new > TypeReference<Map<String, Object>>() { > }); > } catch (IllegalArgumentException e) { > afterMap = mapper.convertValue(afterJsonNode, new > TypeReference<Map<String, Object>>() { > }); > } > > if (afterMap != null) { > // System.out.println("Data in ProcessElem afterMap " + > afterMap); > TableRow row = new TableRow(); > for (Map.Entry<String, Object> entry : afterMap.entrySet()) { > row.set(entry.getKey(), entry.getValue()); > } > > // Insert eka UUID to our table > UUID uuid = UUID.randomUUID(); > row.set(this.uuidKeyName, uuid.toString()); > > > LOG.info("T: {}D: {}", topic, row.toString()); > c.output(KV.of(topic, row)); > }else{ > LOG.error("Data in ProcessElem afterMap mostly null "); > } > } else { > LOG.error("The 'after' field is null."); > } > } > } > > > > public static class GetSchemaViewFromBigQueryRows { > > private static final Logger LOG = > LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); > private static final Map<String, TableSchema> schemaCache = new > ConcurrentHashMap<>(); > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); > > private static String schemaToJson(TableSchema schema) { > try { > return OBJECT_MAPPER.writeValueAsString(schema); > } catch (JsonProcessingException e) { > throw new RuntimeException("Error converting schema to JSON", e); > } > } > > // Method to generate a schema for the given type > public static String getSchemaForType(String type, TableRow tableRow) { > > TableSchema schema = schemaCache.get(type); > > if (schema == null) { > schema = new TableSchema().setFields(new ArrayList<>()); > schemaCache.put(type, schema); > } > > // old fields > // Create a set of existing field names to avoid duplicates > Set<String> existingFieldNames = new HashSet<>(); > for (TableFieldSchema field : schema.getFields()) { > existingFieldNames.add(field.getName()); > } > > // set to new fields > TableSchema finalSchema = schema; > tableRow.forEach((fieldName, fieldValue) -> { > if (!existingFieldNames.contains(fieldName)) { > TableFieldSchema fieldSchema = new TableFieldSchema() > .setName(fieldName) > .setType(determineFieldType(fieldValue)); > finalSchema.getFields().add(fieldSchema); > } > }); > > // TODO: Add cases for handling schema conflicts also here > > // update final schema in map > schemaCache.put(type, finalSchema); > return schemaToJson(finalSchema); > } > > // Method to create a view of schemas as a PCollectionView<Map<String, > TableSchema>> > public static PCollectionView<Map<String, String>> createSchemasView( > PCollection<KV<String, TableRow>> input) { > > // Map InputData to KV<String, TableSchema> > PCollection<KV<String, String>> schemas = input.apply("MapElements > for converting KV", > > MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), > TypeDescriptors.strings())) > .via(data -> { > assert data != null; > return KV.of(data.getKey(), > getSchemaForType(data.getKey(), data.getValue())); > }) > ); > > // Deduplicate by key (topic) using Combine.perKey() and select any > one schema for each key > PCollection<KV<String, String>> uniqueSchemas = schemas > .apply("DeduplicateSchemas", Combine.perKey((schema1, > schema2) -> schema1)); // Keep one schema per topic > > // Use View.asMap() to create a side input for schemas > return uniqueSchemas.apply("ToSchemaView", View.asMap()); > } > } > > public class BatchDynamicDestination extends DynamicDestinations<KV<String, > TableRow>, DestinationInfo> { > > private static final Logger LOG = > LoggerFactory.getLogger(BatchDynamicDestination.class); > > private final String datasetName; > private final PCollectionView<Map<String, String>> schemaView; > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); > > public BatchDynamicDestination(String datasetName, > PCollectionView<Map<String, String>> schemaView) { > this.datasetName = datasetName; > this.schemaView = schemaView; > } > > @Override > public Coder<DestinationInfo> getDestinationCoder() { > // Using SerializableCoder because DestinationInfo implements > Serializable > return DestinationInfoCoder.of(); > } > > @Override > public java.util.List<PCollectionView<?>> getSideInputs() { > // Register the schemaView as a side input > return Collections.singletonList(schemaView); > } > > > @Override > public DestinationInfo getDestination(ValueInSingleWindow<KV<String, > TableRow>> element) { > > > Map<String, String> schemas = sideInput(schemaView); > String topic = element.getValue().getKey(); > > String tableSchema = schemas.get(topic); > if (tableSchema == null) { > throw new RuntimeException("Schema not found for topic: " + > topic); > } > > TableSchema tableSchemaObj; > try { > tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, > TableSchema.class); // Deserialize JSON to TableSchema > } catch (IOException e) { > throw new RuntimeException("Error parsing schema JSON", e); > } > > LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , datasetName, > topic, tableSchema); > > return new DestinationInfo("livewel-prod", datasetName, topic, > tableSchemaObj); > } > > @Override > public TableDestination getTable(DestinationInfo destination) { > return new TableDestination(destination.toTableSpec(), "Dynamically > generated table"); > } > > @Override > public TableSchema getSchema(DestinationInfo destination) { > return destination.getTableSchema(); > } > > > } > >