Ah, I suspect I know what's going on. What you're doing won't work unless the DestinationInfoCoder is a deterministic coder (i.e. two equivalent objects map to the same encoded representation). A json encoding often isn't deterministic (for one reason, it's legal to encode the json fields in any order).
On Fri, Oct 25, 2024 at 2:22 PM Pranjal Pandit <pran...@eka.care> wrote: > Hi Kenneth / Reuven, > > Actually I did a little more observation on the root cause on what might > be the actual problem here, > > Here's what I found: > > > 409s on Bigquery duplicate jobs might be the side effect of something else > related to schema, > What I have came to observe is that when I am giving DynamicDestinations > as below it works (*Working* / *Not Working*) > > Basically, upon returning DestinationInfo object it somehow is not able to > get correct schema downstream into the pipeline while writing to bigquery > However if I return a normal string in getDestination() and construct the > actual schema in getSchema() then it works correctly > > I have also shared DestinationInfo's coder (if there might be something > there) > > Seems like this may be a bug in Beam ? I am trying to get past GCP product > / engg teams but not much luch there :( > > If this is bug, where I should report it, > May be I can give a fix if I have understood the actual problem correctly, > > Now, as a side effect, what's happening is that, Job shows successful on > GCP Dataflow, but only a very small portion of data gets written to BQ, > At the least the pipeline should throw some error on schema if that's the > actual problem, > > Reuven, > While working on fixing my write to BQ's I tried a combination of GCP > runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 , > latest (2.60.0), but problem still persisted > Seems to me root cause is independent on anything on runner / versions, > > > > *Working* > > public class BatchDynamicDestination extends DynamicDestinations<KV<String, > TableRow>, String> { > > private static final Logger LOG = > LoggerFactory.getLogger(BatchDynamicDestination.class); > > private static final Map<String, TableSchema> SCHEMA_MAP = new > HashMap<>(); > > private final String datasetName; > private final PCollectionView<Map<String, String>> schemaView; > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); > > public BatchDynamicDestination(String datasetName, > PCollectionView<Map<String, String>> schemaView) { > this.datasetName = datasetName; > this.schemaView = schemaView; > } > > @Override > public java.util.List<PCollectionView<?>> getSideInputs() { > // Register the schemaView as a side input > return Collections.singletonList(schemaView); > } > > > @Override > public String getDestination(ValueInSingleWindow<KV<String, TableRow>> > element) { > String topic = element.getValue().getKey(); > return String.format("livewel-prod:%s.%s", datasetName, topic); > } > > @Override > public TableDestination getTable(String destination) { > String tableName = destination.toLowerCase(); > return new TableDestination(tableName, "Table for category "); > } > > @Override > public TableSchema getSchema(String destination) { > Map<String, String> schemasMap = sideInput(schemaView); > LOG.info("destination in getSchema: {}", destination); > StringBuilder sb = new StringBuilder(); > schemasMap.entrySet().stream().forEach(entry -> { > sb.append(entry.getKey()).append(" : "); > }); > LOG.info("destination in getSchema map: {}", sb.toString()); > String[] parts = destination.split("\\."); > > //assert parts.length > 1; > > if (parts.length < 2) { > throw new RuntimeException("parts.length in > BatchDynamicDestinations is less than 2: " + destination); > } > String tableSchema = schemasMap.get(parts[1]); > TableSchema tableSchemaObj; > try { > tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, > TableSchema.class); // Deserialize JSON to TableSchema > } catch (IOException e) { > throw new RuntimeException("Error parsing schema JSON", e); > } > > return tableSchemaObj; > } > > } > > > > *Not working* > > public class StreamDynamicDestination extends > DynamicDestinations<BigQueryRow, DestinationInfo> { > > private final String bigQueryDataset; > > public StreamDynamicDestination(String bigQueryDataset){ > this.bigQueryDataset = bigQueryDataset; > } > > @Override > public Coder<DestinationInfo> getDestinationCoder() { > // Using SerializableCoder because DestinationInfo implements > Serializable > return DestinationInfoCoder.of(); > } > > @Override > public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> > element) { > String tablePrefix = "testvitalsdb"; > BigQueryRow bigQueryRow = element.getValue(); > assert bigQueryRow != null; > TableSchema tableSchema = bigQueryRow.generateSchemaFromRow(); > //return new DestinationInfo("livewel-prod", dataset, > String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), > tableSchema); > return new DestinationInfo("livewel-prod", bigQueryDataset, > tablePrefix, tableSchema); > } > > @Override > public TableDestination getTable(DestinationInfo destination) { > return new TableDestination(destination.toTableSpec(), "Dynamically > generated table"); > } > > @Override > public TableSchema getSchema(DestinationInfo destination) { > return destination.getTableSchema(); > } > > } > > > > *My DestinationInfoCoder* > > public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> { > > private static final ObjectMapper objectMapper = new ObjectMapper(); > > @Override > public void encode(DestinationInfo value, OutputStream outStream) throws > CoderException, IOException { > // Encode the String fields > try { > StringUtf8Coder.of().encode(value.getProject(), outStream); > StringUtf8Coder.of().encode(value.getDataset(), outStream); > StringUtf8Coder.of().encode(value.getTable(), outStream); > > // Encode TableSchema as JSON string > String schemaJson = > objectMapper.writeValueAsString(value.getTableSchema()); > StringUtf8Coder.of().encode(schemaJson, outStream); > }catch (Exception e) { > throw new CoderException("Failed to decode project field", e); > } > } > > @Override > public DestinationInfo decode(InputStream inStream) throws > CoderException, IOException { > // Decode the String fields > String project; > String dataset; > String table; > TableSchema tableSchema; > try{ > project = StringUtf8Coder.of().decode(inStream); > dataset = StringUtf8Coder.of().decode(inStream); > table = StringUtf8Coder.of().decode(inStream); > > // Decode TableSchema from JSON string > String schemaJson = StringUtf8Coder.of().decode(inStream); > tableSchema = objectMapper.readValue(schemaJson, > TableSchema.class); > } catch (Exception e) { > throw new CoderException("Failed to decode project field", e); > } > return new DestinationInfo(project, dataset, table, tableSchema); > } > > public static DestinationInfoCoder of() { > return new DestinationInfoCoder(); > } > } > > > On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote: > >> Are you using runner v2? The issue you linked to implies that this only >> happened on runner v2. >> >> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> Hi Pranjal, >>> >>> If there is a bug in Beam, this is a good list to contact. If there is a >>> problem with a GCP service, then GCP support is better. >>> >>> I see the code you shared, but what error or difficulty are you >>> encountering? >>> >>> Kenn >>> >>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote: >>> >>>> Hi Kenneth / Yi Hun, >>>> >>>> I have been grappling since past few days to solve for a pipeline which >>>> runs as batch pipeline and loads data from GCS to BigQuery, >>>> >>>> >>>> While striving to find solution I found out similar issue and have >>>> posted what I am observing her in github issues link ( >>>> https://github.com/apache/beam/issues/28219), >>>> >>>> We are using GCP Dataflow runner as such I have written to GCP >>>> support but not much luck there, >>>> >>>> I will try to briefly explain my problem I am facing here again , >>>> >>>> Really appreciate any leads on how I can resolve this issue, >>>> >>>> >>>> Trying to use dynamic destinations to load data from GCS to Bigquery to >>>> multiple tables / multiple schemas , >>>> >>>> >>>> Tried to put down all required classes I am using here, >>>> >>>> >>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines >>>> .apply("Convert To BigQueryRow", ParDo.of(new >>>> Utility.ConvertToBigQueryRow(ekaUUID))); >>>> >>>> // Group by TableId to manage different schemas per table type >>>> >>>> PCollectionView<Map<String, String>> schemaView = >>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >>>> >>>> WriteResult result = kvRows.apply("WriteToBigQuery", >>>> BigQueryIO.<KV<String, TableRow>>write() >>>> .to(new BatchDynamicDestination(bigqueryDatasetName, >>>> schemaView)) >>>> >>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>> >>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>>> .withFormatFunction(stringTableRowKV -> stringTableRowKV >>>> != null ? stringTableRowKV.getValue() : null) >>>> ); >>>> >>>> >>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, >>>> KV<String, TableRow>> { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >>>> >>>> String uuidKeyName; >>>> >>>> public ConvertToBigQueryRow(String uuidKeyName){ >>>> this.uuidKeyName = uuidKeyName; >>>> } >>>> >>>> @ProcessElement >>>> public void processElement(ProcessContext c) throws >>>> JsonProcessingException { >>>> ObjectMapper mapper = new ObjectMapper(); >>>> String value = Objects.requireNonNull(c.element().getValue()); >>>> String topic = Objects.requireNonNull(c.element().getKey()); >>>> // System.out.println("Data in ProcessElem " + value); >>>> // Parse the entire value as a JSON tree >>>> JsonNode rootNode = mapper.readTree(value); >>>> >>>> // Extract the "after" field as a JSON node (not as a string) >>>> JsonNode afterNode = rootNode.get("after"); >>>> >>>> // System.out.println("Data in ProcessElem after " + >>>> afterNode.asText()); >>>> // Check if "after" field exists and is not null >>>> if (!afterNode.isNull()) { >>>> >>>> String afterJsonString = afterNode.asText(); // Extract the >>>> string >>>> // representation for json node for after >>>> JsonNode afterJsonNode = mapper.readTree(afterJsonString); // >>>> Parse the string into a JsonNode >>>> >>>> Map<String, Object> afterMap = new HashMap<>(); >>>> try { >>>> afterMap = mapper.convertValue(afterNode, new >>>> TypeReference<Map<String, Object>>() { >>>> }); >>>> } catch (IllegalArgumentException e) { >>>> afterMap = mapper.convertValue(afterJsonNode, new >>>> TypeReference<Map<String, Object>>() { >>>> }); >>>> } >>>> >>>> if (afterMap != null) { >>>> // System.out.println("Data in ProcessElem afterMap " + >>>> afterMap); >>>> TableRow row = new TableRow(); >>>> for (Map.Entry<String, Object> entry : >>>> afterMap.entrySet()) { >>>> row.set(entry.getKey(), entry.getValue()); >>>> } >>>> >>>> // Insert eka UUID to our table >>>> UUID uuid = UUID.randomUUID(); >>>> row.set(this.uuidKeyName, uuid.toString()); >>>> >>>> >>>> LOG.info("T: {}D: {}", topic, row.toString()); >>>> c.output(KV.of(topic, row)); >>>> }else{ >>>> LOG.error("Data in ProcessElem afterMap mostly null "); >>>> } >>>> } else { >>>> LOG.error("The 'after' field is null."); >>>> } >>>> } >>>> } >>>> >>>> >>>> >>>> public static class GetSchemaViewFromBigQueryRows { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >>>> private static final Map<String, TableSchema> schemaCache = new >>>> ConcurrentHashMap<>(); >>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>> >>>> private static String schemaToJson(TableSchema schema) { >>>> try { >>>> return OBJECT_MAPPER.writeValueAsString(schema); >>>> } catch (JsonProcessingException e) { >>>> throw new RuntimeException("Error converting schema to JSON", >>>> e); >>>> } >>>> } >>>> >>>> // Method to generate a schema for the given type >>>> public static String getSchemaForType(String type, TableRow tableRow) { >>>> >>>> TableSchema schema = schemaCache.get(type); >>>> >>>> if (schema == null) { >>>> schema = new TableSchema().setFields(new ArrayList<>()); >>>> schemaCache.put(type, schema); >>>> } >>>> >>>> // old fields >>>> // Create a set of existing field names to avoid duplicates >>>> Set<String> existingFieldNames = new HashSet<>(); >>>> for (TableFieldSchema field : schema.getFields()) { >>>> existingFieldNames.add(field.getName()); >>>> } >>>> >>>> // set to new fields >>>> TableSchema finalSchema = schema; >>>> tableRow.forEach((fieldName, fieldValue) -> { >>>> if (!existingFieldNames.contains(fieldName)) { >>>> TableFieldSchema fieldSchema = new TableFieldSchema() >>>> .setName(fieldName) >>>> .setType(determineFieldType(fieldValue)); >>>> finalSchema.getFields().add(fieldSchema); >>>> } >>>> }); >>>> >>>> // TODO: Add cases for handling schema conflicts also here >>>> >>>> // update final schema in map >>>> schemaCache.put(type, finalSchema); >>>> return schemaToJson(finalSchema); >>>> } >>>> >>>> // Method to create a view of schemas as a PCollectionView<Map<String, >>>> TableSchema>> >>>> public static PCollectionView<Map<String, String>> createSchemasView( >>>> PCollection<KV<String, TableRow>> input) { >>>> >>>> // Map InputData to KV<String, TableSchema> >>>> PCollection<KV<String, String>> schemas = input.apply("MapElements >>>> for converting KV", >>>> >>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >>>> TypeDescriptors.strings())) >>>> .via(data -> { >>>> assert data != null; >>>> return KV.of(data.getKey(), >>>> getSchemaForType(data.getKey(), data.getValue())); >>>> }) >>>> ); >>>> >>>> // Deduplicate by key (topic) using Combine.perKey() and select >>>> any one schema for each key >>>> PCollection<KV<String, String>> uniqueSchemas = schemas >>>> .apply("DeduplicateSchemas", Combine.perKey((schema1, >>>> schema2) -> schema1)); // Keep one schema per topic >>>> >>>> // Use View.asMap() to create a side input for schemas >>>> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >>>> } >>>> } >>>> >>>> public class BatchDynamicDestination extends >>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>>> >>>> private final String datasetName; >>>> private final PCollectionView<Map<String, String>> schemaView; >>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>> >>>> public BatchDynamicDestination(String datasetName, >>>> PCollectionView<Map<String, String>> schemaView) { >>>> this.datasetName = datasetName; >>>> this.schemaView = schemaView; >>>> } >>>> >>>> @Override >>>> public Coder<DestinationInfo> getDestinationCoder() { >>>> // Using SerializableCoder because DestinationInfo implements >>>> Serializable >>>> return DestinationInfoCoder.of(); >>>> } >>>> >>>> @Override >>>> public java.util.List<PCollectionView<?>> getSideInputs() { >>>> // Register the schemaView as a side input >>>> return Collections.singletonList(schemaView); >>>> } >>>> >>>> >>>> @Override >>>> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >>>> TableRow>> element) { >>>> >>>> >>>> Map<String, String> schemas = sideInput(schemaView); >>>> String topic = element.getValue().getKey(); >>>> >>>> String tableSchema = schemas.get(topic); >>>> if (tableSchema == null) { >>>> throw new RuntimeException("Schema not found for topic: " + >>>> topic); >>>> } >>>> >>>> TableSchema tableSchemaObj; >>>> try { >>>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>>> TableSchema.class); // Deserialize JSON to TableSchema >>>> } catch (IOException e) { >>>> throw new RuntimeException("Error parsing schema JSON", e); >>>> } >>>> >>>> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >>>> datasetName, topic, tableSchema); >>>> >>>> return new DestinationInfo("livewel-prod", datasetName, topic, >>>> tableSchemaObj); >>>> } >>>> >>>> @Override >>>> public TableDestination getTable(DestinationInfo destination) { >>>> return new TableDestination(destination.toTableSpec(), >>>> "Dynamically generated table"); >>>> } >>>> >>>> @Override >>>> public TableSchema getSchema(DestinationInfo destination) { >>>> return destination.getTableSchema(); >>>> } >>>> >>>> >>>> } >>>> >>>>