Hi Kenneth / Reuven, Actually I did a little more observation on the root cause on what might be the actual problem here,
Here's what I found: 409s on Bigquery duplicate jobs might be the side effect of something else related to schema, What I have came to observe is that when I am giving DynamicDestinations as below it works (*Working* / *Not Working*) Basically, upon returning DestinationInfo object it somehow is not able to get correct schema downstream into the pipeline while writing to bigquery However if I return a normal string in getDestination() and construct the actual schema in getSchema() then it works correctly I have also shared DestinationInfo's coder (if there might be something there) Seems like this may be a bug in Beam ? I am trying to get past GCP product / engg teams but not much luch there :( If this is bug, where I should report it, May be I can give a fix if I have understood the actual problem correctly, Now, as a side effect, what's happening is that, Job shows successful on GCP Dataflow, but only a very small portion of data gets written to BQ, At the least the pipeline should throw some error on schema if that's the actual problem, Reuven, While working on fixing my write to BQ's I tried a combination of GCP runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 , latest (2.60.0), but problem still persisted Seems to me root cause is independent on anything on runner / versions, *Working* public class BatchDynamicDestination extends DynamicDestinations<KV<String, TableRow>, String> { private static final Logger LOG = LoggerFactory.getLogger(BatchDynamicDestination.class); private static final Map<String, TableSchema> SCHEMA_MAP = new HashMap<>(); private final String datasetName; private final PCollectionView<Map<String, String>> schemaView; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public BatchDynamicDestination(String datasetName, PCollectionView<Map<String, String>> schemaView) { this.datasetName = datasetName; this.schemaView = schemaView; } @Override public java.util.List<PCollectionView<?>> getSideInputs() { // Register the schemaView as a side input return Collections.singletonList(schemaView); } @Override public String getDestination(ValueInSingleWindow<KV<String, TableRow>> element) { String topic = element.getValue().getKey(); return String.format("livewel-prod:%s.%s", datasetName, topic); } @Override public TableDestination getTable(String destination) { String tableName = destination.toLowerCase(); return new TableDestination(tableName, "Table for category "); } @Override public TableSchema getSchema(String destination) { Map<String, String> schemasMap = sideInput(schemaView); LOG.info("destination in getSchema: {}", destination); StringBuilder sb = new StringBuilder(); schemasMap.entrySet().stream().forEach(entry -> { sb.append(entry.getKey()).append(" : "); }); LOG.info("destination in getSchema map: {}", sb.toString()); String[] parts = destination.split("\\."); //assert parts.length > 1; if (parts.length < 2) { throw new RuntimeException("parts.length in BatchDynamicDestinations is less than 2: " + destination); } String tableSchema = schemasMap.get(parts[1]); TableSchema tableSchemaObj; try { tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, TableSchema.class); // Deserialize JSON to TableSchema } catch (IOException e) { throw new RuntimeException("Error parsing schema JSON", e); } return tableSchemaObj; } } *Not working* public class StreamDynamicDestination extends DynamicDestinations<BigQueryRow, DestinationInfo> { private final String bigQueryDataset; public StreamDynamicDestination(String bigQueryDataset){ this.bigQueryDataset = bigQueryDataset; } @Override public Coder<DestinationInfo> getDestinationCoder() { // Using SerializableCoder because DestinationInfo implements Serializable return DestinationInfoCoder.of(); } @Override public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> element) { String tablePrefix = "testvitalsdb"; BigQueryRow bigQueryRow = element.getValue(); assert bigQueryRow != null; TableSchema tableSchema = bigQueryRow.generateSchemaFromRow(); //return new DestinationInfo("livewel-prod", dataset, String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), tableSchema); return new DestinationInfo("livewel-prod", bigQueryDataset, tablePrefix, tableSchema); } @Override public TableDestination getTable(DestinationInfo destination) { return new TableDestination(destination.toTableSpec(), "Dynamically generated table"); } @Override public TableSchema getSchema(DestinationInfo destination) { return destination.getTableSchema(); } } *My DestinationInfoCoder* public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> { private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public void encode(DestinationInfo value, OutputStream outStream) throws CoderException, IOException { // Encode the String fields try { StringUtf8Coder.of().encode(value.getProject(), outStream); StringUtf8Coder.of().encode(value.getDataset(), outStream); StringUtf8Coder.of().encode(value.getTable(), outStream); // Encode TableSchema as JSON string String schemaJson = objectMapper.writeValueAsString(value.getTableSchema()); StringUtf8Coder.of().encode(schemaJson, outStream); }catch (Exception e) { throw new CoderException("Failed to decode project field", e); } } @Override public DestinationInfo decode(InputStream inStream) throws CoderException, IOException { // Decode the String fields String project; String dataset; String table; TableSchema tableSchema; try{ project = StringUtf8Coder.of().decode(inStream); dataset = StringUtf8Coder.of().decode(inStream); table = StringUtf8Coder.of().decode(inStream); // Decode TableSchema from JSON string String schemaJson = StringUtf8Coder.of().decode(inStream); tableSchema = objectMapper.readValue(schemaJson, TableSchema.class); } catch (Exception e) { throw new CoderException("Failed to decode project field", e); } return new DestinationInfo(project, dataset, table, tableSchema); } public static DestinationInfoCoder of() { return new DestinationInfoCoder(); } } On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote: > Are you using runner v2? The issue you linked to implies that this only > happened on runner v2. > > On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote: > >> Hi Pranjal, >> >> If there is a bug in Beam, this is a good list to contact. If there is a >> problem with a GCP service, then GCP support is better. >> >> I see the code you shared, but what error or difficulty are you >> encountering? >> >> Kenn >> >> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote: >> >>> Hi Kenneth / Yi Hun, >>> >>> I have been grappling since past few days to solve for a pipeline which >>> runs as batch pipeline and loads data from GCS to BigQuery, >>> >>> >>> While striving to find solution I found out similar issue and have >>> posted what I am observing her in github issues link ( >>> https://github.com/apache/beam/issues/28219), >>> >>> We are using GCP Dataflow runner as such I have written to GCP >>> support but not much luck there, >>> >>> I will try to briefly explain my problem I am facing here again , >>> >>> Really appreciate any leads on how I can resolve this issue, >>> >>> >>> Trying to use dynamic destinations to load data from GCS to Bigquery to >>> multiple tables / multiple schemas , >>> >>> >>> Tried to put down all required classes I am using here, >>> >>> >>> PCollection<KV<String, TableRow>> kvRows = decompressedLines >>> .apply("Convert To BigQueryRow", ParDo.of(new >>> Utility.ConvertToBigQueryRow(ekaUUID))); >>> >>> // Group by TableId to manage different schemas per table type >>> >>> PCollectionView<Map<String, String>> schemaView = >>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >>> >>> WriteResult result = kvRows.apply("WriteToBigQuery", >>> BigQueryIO.<KV<String, TableRow>>write() >>> .to(new BatchDynamicDestination(bigqueryDatasetName, >>> schemaView)) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>> .withFormatFunction(stringTableRowKV -> stringTableRowKV != >>> null ? stringTableRowKV.getValue() : null) >>> ); >>> >>> >>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, >>> KV<String, TableRow>> { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >>> >>> String uuidKeyName; >>> >>> public ConvertToBigQueryRow(String uuidKeyName){ >>> this.uuidKeyName = uuidKeyName; >>> } >>> >>> @ProcessElement >>> public void processElement(ProcessContext c) throws >>> JsonProcessingException { >>> ObjectMapper mapper = new ObjectMapper(); >>> String value = Objects.requireNonNull(c.element().getValue()); >>> String topic = Objects.requireNonNull(c.element().getKey()); >>> // System.out.println("Data in ProcessElem " + value); >>> // Parse the entire value as a JSON tree >>> JsonNode rootNode = mapper.readTree(value); >>> >>> // Extract the "after" field as a JSON node (not as a string) >>> JsonNode afterNode = rootNode.get("after"); >>> >>> // System.out.println("Data in ProcessElem after " + >>> afterNode.asText()); >>> // Check if "after" field exists and is not null >>> if (!afterNode.isNull()) { >>> >>> String afterJsonString = afterNode.asText(); // Extract the >>> string >>> // representation for json node for after >>> JsonNode afterJsonNode = mapper.readTree(afterJsonString); // >>> Parse the string into a JsonNode >>> >>> Map<String, Object> afterMap = new HashMap<>(); >>> try { >>> afterMap = mapper.convertValue(afterNode, new >>> TypeReference<Map<String, Object>>() { >>> }); >>> } catch (IllegalArgumentException e) { >>> afterMap = mapper.convertValue(afterJsonNode, new >>> TypeReference<Map<String, Object>>() { >>> }); >>> } >>> >>> if (afterMap != null) { >>> // System.out.println("Data in ProcessElem afterMap " + >>> afterMap); >>> TableRow row = new TableRow(); >>> for (Map.Entry<String, Object> entry : afterMap.entrySet()) >>> { >>> row.set(entry.getKey(), entry.getValue()); >>> } >>> >>> // Insert eka UUID to our table >>> UUID uuid = UUID.randomUUID(); >>> row.set(this.uuidKeyName, uuid.toString()); >>> >>> >>> LOG.info("T: {}D: {}", topic, row.toString()); >>> c.output(KV.of(topic, row)); >>> }else{ >>> LOG.error("Data in ProcessElem afterMap mostly null "); >>> } >>> } else { >>> LOG.error("The 'after' field is null."); >>> } >>> } >>> } >>> >>> >>> >>> public static class GetSchemaViewFromBigQueryRows { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >>> private static final Map<String, TableSchema> schemaCache = new >>> ConcurrentHashMap<>(); >>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>> >>> private static String schemaToJson(TableSchema schema) { >>> try { >>> return OBJECT_MAPPER.writeValueAsString(schema); >>> } catch (JsonProcessingException e) { >>> throw new RuntimeException("Error converting schema to JSON", >>> e); >>> } >>> } >>> >>> // Method to generate a schema for the given type >>> public static String getSchemaForType(String type, TableRow tableRow) { >>> >>> TableSchema schema = schemaCache.get(type); >>> >>> if (schema == null) { >>> schema = new TableSchema().setFields(new ArrayList<>()); >>> schemaCache.put(type, schema); >>> } >>> >>> // old fields >>> // Create a set of existing field names to avoid duplicates >>> Set<String> existingFieldNames = new HashSet<>(); >>> for (TableFieldSchema field : schema.getFields()) { >>> existingFieldNames.add(field.getName()); >>> } >>> >>> // set to new fields >>> TableSchema finalSchema = schema; >>> tableRow.forEach((fieldName, fieldValue) -> { >>> if (!existingFieldNames.contains(fieldName)) { >>> TableFieldSchema fieldSchema = new TableFieldSchema() >>> .setName(fieldName) >>> .setType(determineFieldType(fieldValue)); >>> finalSchema.getFields().add(fieldSchema); >>> } >>> }); >>> >>> // TODO: Add cases for handling schema conflicts also here >>> >>> // update final schema in map >>> schemaCache.put(type, finalSchema); >>> return schemaToJson(finalSchema); >>> } >>> >>> // Method to create a view of schemas as a PCollectionView<Map<String, >>> TableSchema>> >>> public static PCollectionView<Map<String, String>> createSchemasView( >>> PCollection<KV<String, TableRow>> input) { >>> >>> // Map InputData to KV<String, TableSchema> >>> PCollection<KV<String, String>> schemas = input.apply("MapElements >>> for converting KV", >>> >>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >>> TypeDescriptors.strings())) >>> .via(data -> { >>> assert data != null; >>> return KV.of(data.getKey(), >>> getSchemaForType(data.getKey(), data.getValue())); >>> }) >>> ); >>> >>> // Deduplicate by key (topic) using Combine.perKey() and select any >>> one schema for each key >>> PCollection<KV<String, String>> uniqueSchemas = schemas >>> .apply("DeduplicateSchemas", Combine.perKey((schema1, >>> schema2) -> schema1)); // Keep one schema per topic >>> >>> // Use View.asMap() to create a side input for schemas >>> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >>> } >>> } >>> >>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, >>> TableRow>, DestinationInfo> { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>> >>> private final String datasetName; >>> private final PCollectionView<Map<String, String>> schemaView; >>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>> >>> public BatchDynamicDestination(String datasetName, >>> PCollectionView<Map<String, String>> schemaView) { >>> this.datasetName = datasetName; >>> this.schemaView = schemaView; >>> } >>> >>> @Override >>> public Coder<DestinationInfo> getDestinationCoder() { >>> // Using SerializableCoder because DestinationInfo implements >>> Serializable >>> return DestinationInfoCoder.of(); >>> } >>> >>> @Override >>> public java.util.List<PCollectionView<?>> getSideInputs() { >>> // Register the schemaView as a side input >>> return Collections.singletonList(schemaView); >>> } >>> >>> >>> @Override >>> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >>> TableRow>> element) { >>> >>> >>> Map<String, String> schemas = sideInput(schemaView); >>> String topic = element.getValue().getKey(); >>> >>> String tableSchema = schemas.get(topic); >>> if (tableSchema == null) { >>> throw new RuntimeException("Schema not found for topic: " + >>> topic); >>> } >>> >>> TableSchema tableSchemaObj; >>> try { >>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>> TableSchema.class); // Deserialize JSON to TableSchema >>> } catch (IOException e) { >>> throw new RuntimeException("Error parsing schema JSON", e); >>> } >>> >>> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >>> datasetName, topic, tableSchema); >>> >>> return new DestinationInfo("livewel-prod", datasetName, topic, >>> tableSchemaObj); >>> } >>> >>> @Override >>> public TableDestination getTable(DestinationInfo destination) { >>> return new TableDestination(destination.toTableSpec(), "Dynamically >>> generated table"); >>> } >>> >>> @Override >>> public TableSchema getSchema(DestinationInfo destination) { >>> return destination.getTableSchema(); >>> } >>> >>> >>> } >>> >>>