One more thing - in addition to a deterministic coder, DestinationInfo needs to have a proper equals and hashTo method defined.
On Fri, Oct 25, 2024 at 10:30 PM Reuven Lax <re...@google.com> wrote: > Ah, I suspect I know what's going on. What you're doing won't work unless > the DestinationInfoCoder is a deterministic coder (i.e. two equivalent > objects map to the same encoded representation). A json encoding often > isn't deterministic (for one reason, it's legal to encode the json fields > in any order). > > On Fri, Oct 25, 2024 at 2:22 PM Pranjal Pandit <pran...@eka.care> wrote: > >> Hi Kenneth / Reuven, >> >> Actually I did a little more observation on the root cause on what might >> be the actual problem here, >> >> Here's what I found: >> >> >> 409s on Bigquery duplicate jobs might be the side effect of something >> else related to schema, >> What I have came to observe is that when I am giving DynamicDestinations >> as below it works (*Working* / *Not Working*) >> >> Basically, upon returning DestinationInfo object it somehow is not able >> to get correct schema downstream into the pipeline while writing to >> bigquery >> However if I return a normal string in getDestination() and construct the >> actual schema in getSchema() then it works correctly >> >> I have also shared DestinationInfo's coder (if there might be something >> there) >> >> Seems like this may be a bug in Beam ? I am trying to get past GCP >> product / engg teams but not much luch there :( >> >> If this is bug, where I should report it, >> May be I can give a fix if I have understood the actual problem >> correctly, >> >> Now, as a side effect, what's happening is that, Job shows successful on >> GCP Dataflow, but only a very small portion of data gets written to BQ, >> At the least the pipeline should throw some error on schema if that's the >> actual problem, >> >> Reuven, >> While working on fixing my write to BQ's I tried a combination of GCP >> runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 , >> latest (2.60.0), but problem still persisted >> Seems to me root cause is independent on anything on runner / versions, >> >> >> >> *Working* >> >> public class BatchDynamicDestination extends DynamicDestinations<KV<String, >> TableRow>, String> { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(BatchDynamicDestination.class); >> >> private static final Map<String, TableSchema> SCHEMA_MAP = new >> HashMap<>(); >> >> private final String datasetName; >> private final PCollectionView<Map<String, String>> schemaView; >> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >> >> public BatchDynamicDestination(String datasetName, >> PCollectionView<Map<String, String>> schemaView) { >> this.datasetName = datasetName; >> this.schemaView = schemaView; >> } >> >> @Override >> public java.util.List<PCollectionView<?>> getSideInputs() { >> // Register the schemaView as a side input >> return Collections.singletonList(schemaView); >> } >> >> >> @Override >> public String getDestination(ValueInSingleWindow<KV<String, TableRow>> >> element) { >> String topic = element.getValue().getKey(); >> return String.format("livewel-prod:%s.%s", datasetName, topic); >> } >> >> @Override >> public TableDestination getTable(String destination) { >> String tableName = destination.toLowerCase(); >> return new TableDestination(tableName, "Table for category "); >> } >> >> @Override >> public TableSchema getSchema(String destination) { >> Map<String, String> schemasMap = sideInput(schemaView); >> LOG.info("destination in getSchema: {}", destination); >> StringBuilder sb = new StringBuilder(); >> schemasMap.entrySet().stream().forEach(entry -> { >> sb.append(entry.getKey()).append(" : "); >> }); >> LOG.info("destination in getSchema map: {}", sb.toString()); >> String[] parts = destination.split("\\."); >> >> //assert parts.length > 1; >> >> if (parts.length < 2) { >> throw new RuntimeException("parts.length in >> BatchDynamicDestinations is less than 2: " + destination); >> } >> String tableSchema = schemasMap.get(parts[1]); >> TableSchema tableSchemaObj; >> try { >> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >> TableSchema.class); // Deserialize JSON to TableSchema >> } catch (IOException e) { >> throw new RuntimeException("Error parsing schema JSON", e); >> } >> >> return tableSchemaObj; >> } >> >> } >> >> >> >> *Not working* >> >> public class StreamDynamicDestination extends >> DynamicDestinations<BigQueryRow, DestinationInfo> { >> >> private final String bigQueryDataset; >> >> public StreamDynamicDestination(String bigQueryDataset){ >> this.bigQueryDataset = bigQueryDataset; >> } >> >> @Override >> public Coder<DestinationInfo> getDestinationCoder() { >> // Using SerializableCoder because DestinationInfo implements >> Serializable >> return DestinationInfoCoder.of(); >> } >> >> @Override >> public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> >> element) { >> String tablePrefix = "testvitalsdb"; >> BigQueryRow bigQueryRow = element.getValue(); >> assert bigQueryRow != null; >> TableSchema tableSchema = bigQueryRow.generateSchemaFromRow(); >> //return new DestinationInfo("livewel-prod", dataset, >> String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), >> tableSchema); >> return new DestinationInfo("livewel-prod", bigQueryDataset, >> tablePrefix, tableSchema); >> } >> >> @Override >> public TableDestination getTable(DestinationInfo destination) { >> return new TableDestination(destination.toTableSpec(), "Dynamically >> generated table"); >> } >> >> @Override >> public TableSchema getSchema(DestinationInfo destination) { >> return destination.getTableSchema(); >> } >> >> } >> >> >> >> *My DestinationInfoCoder* >> >> public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> { >> >> private static final ObjectMapper objectMapper = new ObjectMapper(); >> >> @Override >> public void encode(DestinationInfo value, OutputStream outStream) throws >> CoderException, IOException { >> // Encode the String fields >> try { >> StringUtf8Coder.of().encode(value.getProject(), outStream); >> StringUtf8Coder.of().encode(value.getDataset(), outStream); >> StringUtf8Coder.of().encode(value.getTable(), outStream); >> >> // Encode TableSchema as JSON string >> String schemaJson = >> objectMapper.writeValueAsString(value.getTableSchema()); >> StringUtf8Coder.of().encode(schemaJson, outStream); >> }catch (Exception e) { >> throw new CoderException("Failed to decode project field", e); >> } >> } >> >> @Override >> public DestinationInfo decode(InputStream inStream) throws >> CoderException, IOException { >> // Decode the String fields >> String project; >> String dataset; >> String table; >> TableSchema tableSchema; >> try{ >> project = StringUtf8Coder.of().decode(inStream); >> dataset = StringUtf8Coder.of().decode(inStream); >> table = StringUtf8Coder.of().decode(inStream); >> >> // Decode TableSchema from JSON string >> String schemaJson = StringUtf8Coder.of().decode(inStream); >> tableSchema = objectMapper.readValue(schemaJson, >> TableSchema.class); >> } catch (Exception e) { >> throw new CoderException("Failed to decode project field", e); >> } >> return new DestinationInfo(project, dataset, table, tableSchema); >> } >> >> public static DestinationInfoCoder of() { >> return new DestinationInfoCoder(); >> } >> } >> >> >> On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote: >> >>> Are you using runner v2? The issue you linked to implies that this only >>> happened on runner v2. >>> >>> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Hi Pranjal, >>>> >>>> If there is a bug in Beam, this is a good list to contact. If there is >>>> a problem with a GCP service, then GCP support is better. >>>> >>>> I see the code you shared, but what error or difficulty are you >>>> encountering? >>>> >>>> Kenn >>>> >>>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> >>>> wrote: >>>> >>>>> Hi Kenneth / Yi Hun, >>>>> >>>>> I have been grappling since past few days to solve for a pipeline >>>>> which runs as batch pipeline and loads data from GCS to BigQuery, >>>>> >>>>> >>>>> While striving to find solution I found out similar issue and have >>>>> posted what I am observing her in github issues link ( >>>>> https://github.com/apache/beam/issues/28219), >>>>> >>>>> We are using GCP Dataflow runner as such I have written to GCP >>>>> support but not much luck there, >>>>> >>>>> I will try to briefly explain my problem I am facing here again , >>>>> >>>>> Really appreciate any leads on how I can resolve this issue, >>>>> >>>>> >>>>> Trying to use dynamic destinations to load data from GCS to Bigquery >>>>> to multiple tables / multiple schemas , >>>>> >>>>> >>>>> Tried to put down all required classes I am using here, >>>>> >>>>> >>>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines >>>>> .apply("Convert To BigQueryRow", ParDo.of(new >>>>> Utility.ConvertToBigQueryRow(ekaUUID))); >>>>> >>>>> // Group by TableId to manage different schemas per table type >>>>> >>>>> PCollectionView<Map<String, String>> schemaView = >>>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >>>>> >>>>> WriteResult result = kvRows.apply("WriteToBigQuery", >>>>> BigQueryIO.<KV<String, TableRow>>write() >>>>> .to(new BatchDynamicDestination(bigqueryDatasetName, >>>>> schemaView)) >>>>> >>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>> >>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>>>> .withFormatFunction(stringTableRowKV -> stringTableRowKV >>>>> != null ? stringTableRowKV.getValue() : null) >>>>> ); >>>>> >>>>> >>>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, >>>>> KV<String, TableRow>> { >>>>> >>>>> private static final Logger LOG = >>>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >>>>> >>>>> String uuidKeyName; >>>>> >>>>> public ConvertToBigQueryRow(String uuidKeyName){ >>>>> this.uuidKeyName = uuidKeyName; >>>>> } >>>>> >>>>> @ProcessElement >>>>> public void processElement(ProcessContext c) throws >>>>> JsonProcessingException { >>>>> ObjectMapper mapper = new ObjectMapper(); >>>>> String value = Objects.requireNonNull(c.element().getValue()); >>>>> String topic = Objects.requireNonNull(c.element().getKey()); >>>>> // System.out.println("Data in ProcessElem " + value); >>>>> // Parse the entire value as a JSON tree >>>>> JsonNode rootNode = mapper.readTree(value); >>>>> >>>>> // Extract the "after" field as a JSON node (not as a string) >>>>> JsonNode afterNode = rootNode.get("after"); >>>>> >>>>> // System.out.println("Data in ProcessElem after " + >>>>> afterNode.asText()); >>>>> // Check if "after" field exists and is not null >>>>> if (!afterNode.isNull()) { >>>>> >>>>> String afterJsonString = afterNode.asText(); // Extract the >>>>> string >>>>> // representation for json node for after >>>>> JsonNode afterJsonNode = mapper.readTree(afterJsonString); // >>>>> Parse the string into a JsonNode >>>>> >>>>> Map<String, Object> afterMap = new HashMap<>(); >>>>> try { >>>>> afterMap = mapper.convertValue(afterNode, new >>>>> TypeReference<Map<String, Object>>() { >>>>> }); >>>>> } catch (IllegalArgumentException e) { >>>>> afterMap = mapper.convertValue(afterJsonNode, new >>>>> TypeReference<Map<String, Object>>() { >>>>> }); >>>>> } >>>>> >>>>> if (afterMap != null) { >>>>> // System.out.println("Data in ProcessElem afterMap " + >>>>> afterMap); >>>>> TableRow row = new TableRow(); >>>>> for (Map.Entry<String, Object> entry : >>>>> afterMap.entrySet()) { >>>>> row.set(entry.getKey(), entry.getValue()); >>>>> } >>>>> >>>>> // Insert eka UUID to our table >>>>> UUID uuid = UUID.randomUUID(); >>>>> row.set(this.uuidKeyName, uuid.toString()); >>>>> >>>>> >>>>> LOG.info("T: {}D: {}", topic, row.toString()); >>>>> c.output(KV.of(topic, row)); >>>>> }else{ >>>>> LOG.error("Data in ProcessElem afterMap mostly null "); >>>>> } >>>>> } else { >>>>> LOG.error("The 'after' field is null."); >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> public static class GetSchemaViewFromBigQueryRows { >>>>> >>>>> private static final Logger LOG = >>>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >>>>> private static final Map<String, TableSchema> schemaCache = new >>>>> ConcurrentHashMap<>(); >>>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>>> >>>>> private static String schemaToJson(TableSchema schema) { >>>>> try { >>>>> return OBJECT_MAPPER.writeValueAsString(schema); >>>>> } catch (JsonProcessingException e) { >>>>> throw new RuntimeException("Error converting schema to JSON", >>>>> e); >>>>> } >>>>> } >>>>> >>>>> // Method to generate a schema for the given type >>>>> public static String getSchemaForType(String type, TableRow tableRow) >>>>> { >>>>> >>>>> TableSchema schema = schemaCache.get(type); >>>>> >>>>> if (schema == null) { >>>>> schema = new TableSchema().setFields(new ArrayList<>()); >>>>> schemaCache.put(type, schema); >>>>> } >>>>> >>>>> // old fields >>>>> // Create a set of existing field names to avoid duplicates >>>>> Set<String> existingFieldNames = new HashSet<>(); >>>>> for (TableFieldSchema field : schema.getFields()) { >>>>> existingFieldNames.add(field.getName()); >>>>> } >>>>> >>>>> // set to new fields >>>>> TableSchema finalSchema = schema; >>>>> tableRow.forEach((fieldName, fieldValue) -> { >>>>> if (!existingFieldNames.contains(fieldName)) { >>>>> TableFieldSchema fieldSchema = new TableFieldSchema() >>>>> .setName(fieldName) >>>>> .setType(determineFieldType(fieldValue)); >>>>> finalSchema.getFields().add(fieldSchema); >>>>> } >>>>> }); >>>>> >>>>> // TODO: Add cases for handling schema conflicts also here >>>>> >>>>> // update final schema in map >>>>> schemaCache.put(type, finalSchema); >>>>> return schemaToJson(finalSchema); >>>>> } >>>>> >>>>> // Method to create a view of schemas as a >>>>> PCollectionView<Map<String, TableSchema>> >>>>> public static PCollectionView<Map<String, String>> createSchemasView( >>>>> PCollection<KV<String, TableRow>> input) { >>>>> >>>>> // Map InputData to KV<String, TableSchema> >>>>> PCollection<KV<String, String>> schemas = >>>>> input.apply("MapElements for converting KV", >>>>> >>>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >>>>> TypeDescriptors.strings())) >>>>> .via(data -> { >>>>> assert data != null; >>>>> return KV.of(data.getKey(), >>>>> getSchemaForType(data.getKey(), data.getValue())); >>>>> }) >>>>> ); >>>>> >>>>> // Deduplicate by key (topic) using Combine.perKey() and select >>>>> any one schema for each key >>>>> PCollection<KV<String, String>> uniqueSchemas = schemas >>>>> .apply("DeduplicateSchemas", Combine.perKey((schema1, >>>>> schema2) -> schema1)); // Keep one schema per topic >>>>> >>>>> // Use View.asMap() to create a side input for schemas >>>>> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >>>>> } >>>>> } >>>>> >>>>> public class BatchDynamicDestination extends >>>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> { >>>>> >>>>> private static final Logger LOG = >>>>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>>>> >>>>> private final String datasetName; >>>>> private final PCollectionView<Map<String, String>> schemaView; >>>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>>> >>>>> public BatchDynamicDestination(String datasetName, >>>>> PCollectionView<Map<String, String>> schemaView) { >>>>> this.datasetName = datasetName; >>>>> this.schemaView = schemaView; >>>>> } >>>>> >>>>> @Override >>>>> public Coder<DestinationInfo> getDestinationCoder() { >>>>> // Using SerializableCoder because DestinationInfo implements >>>>> Serializable >>>>> return DestinationInfoCoder.of(); >>>>> } >>>>> >>>>> @Override >>>>> public java.util.List<PCollectionView<?>> getSideInputs() { >>>>> // Register the schemaView as a side input >>>>> return Collections.singletonList(schemaView); >>>>> } >>>>> >>>>> >>>>> @Override >>>>> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >>>>> TableRow>> element) { >>>>> >>>>> >>>>> Map<String, String> schemas = sideInput(schemaView); >>>>> String topic = element.getValue().getKey(); >>>>> >>>>> String tableSchema = schemas.get(topic); >>>>> if (tableSchema == null) { >>>>> throw new RuntimeException("Schema not found for topic: " + >>>>> topic); >>>>> } >>>>> >>>>> TableSchema tableSchemaObj; >>>>> try { >>>>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>>>> TableSchema.class); // Deserialize JSON to TableSchema >>>>> } catch (IOException e) { >>>>> throw new RuntimeException("Error parsing schema JSON", e); >>>>> } >>>>> >>>>> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >>>>> datasetName, topic, tableSchema); >>>>> >>>>> return new DestinationInfo("livewel-prod", datasetName, topic, >>>>> tableSchemaObj); >>>>> } >>>>> >>>>> @Override >>>>> public TableDestination getTable(DestinationInfo destination) { >>>>> return new TableDestination(destination.toTableSpec(), >>>>> "Dynamically generated table"); >>>>> } >>>>> >>>>> @Override >>>>> public TableSchema getSchema(DestinationInfo destination) { >>>>> return destination.getTableSchema(); >>>>> } >>>>> >>>>> >>>>> } >>>>> >>>>>