Are you using runner v2? The issue you linked to implies that this only happened on runner v2.
On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote: > Hi Pranjal, > > If there is a bug in Beam, this is a good list to contact. If there is a > problem with a GCP service, then GCP support is better. > > I see the code you shared, but what error or difficulty are you > encountering? > > Kenn > > On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote: > >> Hi Kenneth / Yi Hun, >> >> I have been grappling since past few days to solve for a pipeline which >> runs as batch pipeline and loads data from GCS to BigQuery, >> >> >> While striving to find solution I found out similar issue and have posted >> what I am observing her in github issues link ( >> https://github.com/apache/beam/issues/28219), >> >> We are using GCP Dataflow runner as such I have written to GCP >> support but not much luck there, >> >> I will try to briefly explain my problem I am facing here again , >> >> Really appreciate any leads on how I can resolve this issue, >> >> >> Trying to use dynamic destinations to load data from GCS to Bigquery to >> multiple tables / multiple schemas , >> >> >> Tried to put down all required classes I am using here, >> >> >> PCollection<KV<String, TableRow>> kvRows = decompressedLines >> .apply("Convert To BigQueryRow", ParDo.of(new >> Utility.ConvertToBigQueryRow(ekaUUID))); >> >> // Group by TableId to manage different schemas per table type >> >> PCollectionView<Map<String, String>> schemaView = >> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >> >> WriteResult result = kvRows.apply("WriteToBigQuery", >> BigQueryIO.<KV<String, TableRow>>write() >> .to(new BatchDynamicDestination(bigqueryDatasetName, >> schemaView)) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >> .withFormatFunction(stringTableRowKV -> stringTableRowKV != >> null ? stringTableRowKV.getValue() : null) >> ); >> >> >> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, >> KV<String, TableRow>> { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >> >> String uuidKeyName; >> >> public ConvertToBigQueryRow(String uuidKeyName){ >> this.uuidKeyName = uuidKeyName; >> } >> >> @ProcessElement >> public void processElement(ProcessContext c) throws >> JsonProcessingException { >> ObjectMapper mapper = new ObjectMapper(); >> String value = Objects.requireNonNull(c.element().getValue()); >> String topic = Objects.requireNonNull(c.element().getKey()); >> // System.out.println("Data in ProcessElem " + value); >> // Parse the entire value as a JSON tree >> JsonNode rootNode = mapper.readTree(value); >> >> // Extract the "after" field as a JSON node (not as a string) >> JsonNode afterNode = rootNode.get("after"); >> >> // System.out.println("Data in ProcessElem after " + >> afterNode.asText()); >> // Check if "after" field exists and is not null >> if (!afterNode.isNull()) { >> >> String afterJsonString = afterNode.asText(); // Extract the >> string >> // representation for json node for after >> JsonNode afterJsonNode = mapper.readTree(afterJsonString); // >> Parse the string into a JsonNode >> >> Map<String, Object> afterMap = new HashMap<>(); >> try { >> afterMap = mapper.convertValue(afterNode, new >> TypeReference<Map<String, Object>>() { >> }); >> } catch (IllegalArgumentException e) { >> afterMap = mapper.convertValue(afterJsonNode, new >> TypeReference<Map<String, Object>>() { >> }); >> } >> >> if (afterMap != null) { >> // System.out.println("Data in ProcessElem afterMap " + >> afterMap); >> TableRow row = new TableRow(); >> for (Map.Entry<String, Object> entry : afterMap.entrySet()) { >> row.set(entry.getKey(), entry.getValue()); >> } >> >> // Insert eka UUID to our table >> UUID uuid = UUID.randomUUID(); >> row.set(this.uuidKeyName, uuid.toString()); >> >> >> LOG.info("T: {}D: {}", topic, row.toString()); >> c.output(KV.of(topic, row)); >> }else{ >> LOG.error("Data in ProcessElem afterMap mostly null "); >> } >> } else { >> LOG.error("The 'after' field is null."); >> } >> } >> } >> >> >> >> public static class GetSchemaViewFromBigQueryRows { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >> private static final Map<String, TableSchema> schemaCache = new >> ConcurrentHashMap<>(); >> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >> >> private static String schemaToJson(TableSchema schema) { >> try { >> return OBJECT_MAPPER.writeValueAsString(schema); >> } catch (JsonProcessingException e) { >> throw new RuntimeException("Error converting schema to JSON", e); >> } >> } >> >> // Method to generate a schema for the given type >> public static String getSchemaForType(String type, TableRow tableRow) { >> >> TableSchema schema = schemaCache.get(type); >> >> if (schema == null) { >> schema = new TableSchema().setFields(new ArrayList<>()); >> schemaCache.put(type, schema); >> } >> >> // old fields >> // Create a set of existing field names to avoid duplicates >> Set<String> existingFieldNames = new HashSet<>(); >> for (TableFieldSchema field : schema.getFields()) { >> existingFieldNames.add(field.getName()); >> } >> >> // set to new fields >> TableSchema finalSchema = schema; >> tableRow.forEach((fieldName, fieldValue) -> { >> if (!existingFieldNames.contains(fieldName)) { >> TableFieldSchema fieldSchema = new TableFieldSchema() >> .setName(fieldName) >> .setType(determineFieldType(fieldValue)); >> finalSchema.getFields().add(fieldSchema); >> } >> }); >> >> // TODO: Add cases for handling schema conflicts also here >> >> // update final schema in map >> schemaCache.put(type, finalSchema); >> return schemaToJson(finalSchema); >> } >> >> // Method to create a view of schemas as a PCollectionView<Map<String, >> TableSchema>> >> public static PCollectionView<Map<String, String>> createSchemasView( >> PCollection<KV<String, TableRow>> input) { >> >> // Map InputData to KV<String, TableSchema> >> PCollection<KV<String, String>> schemas = input.apply("MapElements >> for converting KV", >> >> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >> TypeDescriptors.strings())) >> .via(data -> { >> assert data != null; >> return KV.of(data.getKey(), >> getSchemaForType(data.getKey(), data.getValue())); >> }) >> ); >> >> // Deduplicate by key (topic) using Combine.perKey() and select any >> one schema for each key >> PCollection<KV<String, String>> uniqueSchemas = schemas >> .apply("DeduplicateSchemas", Combine.perKey((schema1, >> schema2) -> schema1)); // Keep one schema per topic >> >> // Use View.asMap() to create a side input for schemas >> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >> } >> } >> >> public class BatchDynamicDestination extends DynamicDestinations<KV<String, >> TableRow>, DestinationInfo> { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(BatchDynamicDestination.class); >> >> private final String datasetName; >> private final PCollectionView<Map<String, String>> schemaView; >> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >> >> public BatchDynamicDestination(String datasetName, >> PCollectionView<Map<String, String>> schemaView) { >> this.datasetName = datasetName; >> this.schemaView = schemaView; >> } >> >> @Override >> public Coder<DestinationInfo> getDestinationCoder() { >> // Using SerializableCoder because DestinationInfo implements >> Serializable >> return DestinationInfoCoder.of(); >> } >> >> @Override >> public java.util.List<PCollectionView<?>> getSideInputs() { >> // Register the schemaView as a side input >> return Collections.singletonList(schemaView); >> } >> >> >> @Override >> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >> TableRow>> element) { >> >> >> Map<String, String> schemas = sideInput(schemaView); >> String topic = element.getValue().getKey(); >> >> String tableSchema = schemas.get(topic); >> if (tableSchema == null) { >> throw new RuntimeException("Schema not found for topic: " + >> topic); >> } >> >> TableSchema tableSchemaObj; >> try { >> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >> TableSchema.class); // Deserialize JSON to TableSchema >> } catch (IOException e) { >> throw new RuntimeException("Error parsing schema JSON", e); >> } >> >> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >> datasetName, topic, tableSchema); >> >> return new DestinationInfo("livewel-prod", datasetName, topic, >> tableSchemaObj); >> } >> >> @Override >> public TableDestination getTable(DestinationInfo destination) { >> return new TableDestination(destination.toTableSpec(), "Dynamically >> generated table"); >> } >> >> @Override >> public TableSchema getSchema(DestinationInfo destination) { >> return destination.getTableSchema(); >> } >> >> >> } >> >>