>From the issue report, it looks like it is happening for both with and without runner V2: https://github.com/apache/beam/issues/28219#issuecomment-2423624052
On Fri, Oct 25, 2024 at 12:21 PM Reuven Lax via dev <dev@beam.apache.org> wrote: > Are you using runner v2? The issue you linked to implies that this only > happened on runner v2. > > On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote: > >> Hi Pranjal, >> >> If there is a bug in Beam, this is a good list to contact. If there is a >> problem with a GCP service, then GCP support is better. >> >> I see the code you shared, but what error or difficulty are you >> encountering? >> >> Kenn >> >> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote: >> >>> Hi Kenneth / Yi Hun, >>> >>> I have been grappling since past few days to solve for a pipeline which >>> runs as batch pipeline and loads data from GCS to BigQuery, >>> >>> >>> While striving to find solution I found out similar issue and have >>> posted what I am observing her in github issues link ( >>> https://github.com/apache/beam/issues/28219), >>> >>> We are using GCP Dataflow runner as such I have written to GCP >>> support but not much luck there, >>> >>> I will try to briefly explain my problem I am facing here again , >>> >>> Really appreciate any leads on how I can resolve this issue, >>> >>> >>> Trying to use dynamic destinations to load data from GCS to Bigquery to >>> multiple tables / multiple schemas , >>> >>> >>> Tried to put down all required classes I am using here, >>> >>> >>> PCollection<KV<String, TableRow>> kvRows = decompressedLines >>> .apply("Convert To BigQueryRow", ParDo.of(new >>> Utility.ConvertToBigQueryRow(ekaUUID))); >>> >>> // Group by TableId to manage different schemas per table type >>> >>> PCollectionView<Map<String, String>> schemaView = >>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >>> >>> WriteResult result = kvRows.apply("WriteToBigQuery", >>> BigQueryIO.<KV<String, TableRow>>write() >>> .to(new BatchDynamicDestination(bigqueryDatasetName, >>> schemaView)) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>> .withFormatFunction(stringTableRowKV -> stringTableRowKV != >>> null ? stringTableRowKV.getValue() : null) >>> ); >>> >>> >>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, >>> KV<String, TableRow>> { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >>> >>> String uuidKeyName; >>> >>> public ConvertToBigQueryRow(String uuidKeyName){ >>> this.uuidKeyName = uuidKeyName; >>> } >>> >>> @ProcessElement >>> public void processElement(ProcessContext c) throws >>> JsonProcessingException { >>> ObjectMapper mapper = new ObjectMapper(); >>> String value = Objects.requireNonNull(c.element().getValue()); >>> String topic = Objects.requireNonNull(c.element().getKey()); >>> // System.out.println("Data in ProcessElem " + value); >>> // Parse the entire value as a JSON tree >>> JsonNode rootNode = mapper.readTree(value); >>> >>> // Extract the "after" field as a JSON node (not as a string) >>> JsonNode afterNode = rootNode.get("after"); >>> >>> // System.out.println("Data in ProcessElem after " + >>> afterNode.asText()); >>> // Check if "after" field exists and is not null >>> if (!afterNode.isNull()) { >>> >>> String afterJsonString = afterNode.asText(); // Extract the >>> string >>> // representation for json node for after >>> JsonNode afterJsonNode = mapper.readTree(afterJsonString); // >>> Parse the string into a JsonNode >>> >>> Map<String, Object> afterMap = new HashMap<>(); >>> try { >>> afterMap = mapper.convertValue(afterNode, new >>> TypeReference<Map<String, Object>>() { >>> }); >>> } catch (IllegalArgumentException e) { >>> afterMap = mapper.convertValue(afterJsonNode, new >>> TypeReference<Map<String, Object>>() { >>> }); >>> } >>> >>> if (afterMap != null) { >>> // System.out.println("Data in ProcessElem afterMap " + >>> afterMap); >>> TableRow row = new TableRow(); >>> for (Map.Entry<String, Object> entry : afterMap.entrySet()) >>> { >>> row.set(entry.getKey(), entry.getValue()); >>> } >>> >>> // Insert eka UUID to our table >>> UUID uuid = UUID.randomUUID(); >>> row.set(this.uuidKeyName, uuid.toString()); >>> >>> >>> LOG.info("T: {}D: {}", topic, row.toString()); >>> c.output(KV.of(topic, row)); >>> }else{ >>> LOG.error("Data in ProcessElem afterMap mostly null "); >>> } >>> } else { >>> LOG.error("The 'after' field is null."); >>> } >>> } >>> } >>> >>> >>> >>> public static class GetSchemaViewFromBigQueryRows { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >>> private static final Map<String, TableSchema> schemaCache = new >>> ConcurrentHashMap<>(); >>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>> >>> private static String schemaToJson(TableSchema schema) { >>> try { >>> return OBJECT_MAPPER.writeValueAsString(schema); >>> } catch (JsonProcessingException e) { >>> throw new RuntimeException("Error converting schema to JSON", >>> e); >>> } >>> } >>> >>> // Method to generate a schema for the given type >>> public static String getSchemaForType(String type, TableRow tableRow) { >>> >>> TableSchema schema = schemaCache.get(type); >>> >>> if (schema == null) { >>> schema = new TableSchema().setFields(new ArrayList<>()); >>> schemaCache.put(type, schema); >>> } >>> >>> // old fields >>> // Create a set of existing field names to avoid duplicates >>> Set<String> existingFieldNames = new HashSet<>(); >>> for (TableFieldSchema field : schema.getFields()) { >>> existingFieldNames.add(field.getName()); >>> } >>> >>> // set to new fields >>> TableSchema finalSchema = schema; >>> tableRow.forEach((fieldName, fieldValue) -> { >>> if (!existingFieldNames.contains(fieldName)) { >>> TableFieldSchema fieldSchema = new TableFieldSchema() >>> .setName(fieldName) >>> .setType(determineFieldType(fieldValue)); >>> finalSchema.getFields().add(fieldSchema); >>> } >>> }); >>> >>> // TODO: Add cases for handling schema conflicts also here >>> >>> // update final schema in map >>> schemaCache.put(type, finalSchema); >>> return schemaToJson(finalSchema); >>> } >>> >>> // Method to create a view of schemas as a PCollectionView<Map<String, >>> TableSchema>> >>> public static PCollectionView<Map<String, String>> createSchemasView( >>> PCollection<KV<String, TableRow>> input) { >>> >>> // Map InputData to KV<String, TableSchema> >>> PCollection<KV<String, String>> schemas = input.apply("MapElements >>> for converting KV", >>> >>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >>> TypeDescriptors.strings())) >>> .via(data -> { >>> assert data != null; >>> return KV.of(data.getKey(), >>> getSchemaForType(data.getKey(), data.getValue())); >>> }) >>> ); >>> >>> // Deduplicate by key (topic) using Combine.perKey() and select any >>> one schema for each key >>> PCollection<KV<String, String>> uniqueSchemas = schemas >>> .apply("DeduplicateSchemas", Combine.perKey((schema1, >>> schema2) -> schema1)); // Keep one schema per topic >>> >>> // Use View.asMap() to create a side input for schemas >>> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >>> } >>> } >>> >>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, >>> TableRow>, DestinationInfo> { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>> >>> private final String datasetName; >>> private final PCollectionView<Map<String, String>> schemaView; >>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>> >>> public BatchDynamicDestination(String datasetName, >>> PCollectionView<Map<String, String>> schemaView) { >>> this.datasetName = datasetName; >>> this.schemaView = schemaView; >>> } >>> >>> @Override >>> public Coder<DestinationInfo> getDestinationCoder() { >>> // Using SerializableCoder because DestinationInfo implements >>> Serializable >>> return DestinationInfoCoder.of(); >>> } >>> >>> @Override >>> public java.util.List<PCollectionView<?>> getSideInputs() { >>> // Register the schemaView as a side input >>> return Collections.singletonList(schemaView); >>> } >>> >>> >>> @Override >>> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >>> TableRow>> element) { >>> >>> >>> Map<String, String> schemas = sideInput(schemaView); >>> String topic = element.getValue().getKey(); >>> >>> String tableSchema = schemas.get(topic); >>> if (tableSchema == null) { >>> throw new RuntimeException("Schema not found for topic: " + >>> topic); >>> } >>> >>> TableSchema tableSchemaObj; >>> try { >>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>> TableSchema.class); // Deserialize JSON to TableSchema >>> } catch (IOException e) { >>> throw new RuntimeException("Error parsing schema JSON", e); >>> } >>> >>> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >>> datasetName, topic, tableSchema); >>> >>> return new DestinationInfo("livewel-prod", datasetName, topic, >>> tableSchemaObj); >>> } >>> >>> @Override >>> public TableDestination getTable(DestinationInfo destination) { >>> return new TableDestination(destination.toTableSpec(), "Dynamically >>> generated table"); >>> } >>> >>> @Override >>> public TableSchema getSchema(DestinationInfo destination) { >>> return destination.getTableSchema(); >>> } >>> >>> >>> } >>> >>>