menna224 opened a new issue, #8018:
URL: https://github.com/apache/hudi/issues/8018

   we have a glue streaming job that writes to hudi table, we try to do schema 
evolution, when we add a new col to any record, it works fine and the new col 
is shown when querying the table, the thing is we expect that it should not 
evolute the schema because we didn't set the config 
hoodie.schema.on.read.enable, and as we understand that this config is set by 
default to false, and as per hudi docs:
   
   "Enables support for Schema Evolution feature
   Default Value: false (Optional)
   Config Param: SCHEMA_EVOLUTION_ENABLE"
   
   so when didn't define it on our config, it shouldn't allow for the schema 
evolution and adding of the new columns, right?
   we even tried to explicitly set it to false in our connection options, but 
still , when we add a new col it's shown to our table
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. run the glue streaming job
   2. add a record with new col/ attribute( attribute in case of dynamodb)
   3. query the hudi table
   
   
   **Expected behavior**
   
   it shouldn't show the added cols/attributes as we disabled schema evolution 
and the col/attribute shouldn't be existing also in the schema of the table in 
the datalake.
   
   **Environment Description**
   
   * Hudi version : .12
   
   * Spark version : 3
   
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   * glue version: 4
   
   
   **Additional context**
   
   our connection options are:
   hudiWriteConfig = {
       'className': 'org.apache.hudi',
       'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
       'hoodie.table.name': hudi_table_name,
       'hoodie.datasource.write.table.name': hudi_table_name,
       'hoodie.datasource.write.precombine.field': 'timestamp',
       'hoodie.datasource.write.recordkey.field': 'user_id',
       'hoodie.datasource.write.operation': 'upsert',
       #"hoodie.compact.schedule.inline":"true",
       'hoodie.datasource.hive_sync.use_jdbc':'false',
       'hoodie.datasource.hive_sync.mode':'hms',
       "hoodie.compact.inline": "true",
       "hoodie.compact.inline.max.delta.commits":"3",
       "hoodie.schema.on.read.enable":"false",
       
"hoodie.deltastreamer.schemaprovider.source.schema.file":"s3://hudi-test-table/menna/src.acsv",
       
"hoodie.deltastreamer.schemaprovider.target.schema.file":"s3://hudi-test-table/menna/target.acsv"
       #'hoodie.datasource.write.partitionpath.field': 
'year:SIMPLE,month:SIMPLE,day:SIMPLE',
       #'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.CustomKeyGenerator',
       #'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'DATE_STRING',
       #'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
       #'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
   }
   
   hudiGlueConfig = {
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.sync_as_datasource': 'true',
       'hoodie.datasource.hive_sync.database': database_name,
       'hoodie.datasource.hive_sync.table': hudi_table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.write.hive_style_partitioning': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       #'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
   }
   
   commonConfig = {
       'path': s3_path_hudi_table
   }
   
   combinedConf = {
       **commonConfig,
       **hudiWriteConfig,
       **hudiGlueConfig
   }
   
   in glue streaming job we use:
   glueContext.forEachBatch(
       frame=data_frame_DataSource0,
       batch_function=processBatch,
       options={
           "windowSize": window_size,
           "checkpointLocation": s3_path_spark_checkpoints
       }
   )
   
   and:
   data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
       database=database_name,
       table_name=kinesis_table_name,
       transformation_ctx="DataSource0",
       additional_options={"inferSchema": "true", "startingPosition": 
starting_position_of_kinesis_iterator}
   )
   
   and the way we write our hudi table is:
    
kinesis_data_frame.write.format("hudi").options(**combinedConf).mode("append").save()
   
   sometimes we write it as follows but it gives the same behaviour:
    glueContext.write_dynamic_frame.from_options(
               frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext, 
"evolved_kinesis_data_frame"),
               connection_type="custom.spark",
               connection_options=combinedConf
           )
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to