soumilshah1995 opened a new issue, #11183:
URL: https://github.com/apache/hudi/issues/11183
# Writer
```
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages
org.apache.hadoop:hadoop-aws:3.3.2,org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.executor.memory', '4g') \
.config('spark.driver.memory', '4g') \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
"http://127.0.0.1:9000/")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "admin")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled",
"false")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
import uuid
from faker import Faker
from datetime import datetime
faker = Faker()
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"created_at": datetime.now().isoformat().__str__(),
"address": faker.address(),
"state": str(faker.state_abbr()), # Adding state information
"salary": faker.random_int(min=30000, max=100000)
}
customers_array.append(customer_data)
return customers_array
global total_customers, order_data_sample_size
total_customers = 10000
customer_data = get_customer_data(total_customers=total_customers)
spark_df = spark_df_customers
database="default"
table_name="customers_t1"
path = f"s3a://warehouse/{database}/{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': 'customers',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'customer_id',
'hoodie.datasource.write.precombine.field': 'created_at',
"hoodie.datasource.write.partitionpath.field": "state",
"hoodie.enable.data.skipping": "true",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.metastore.uris": "thrift://localhost:9083",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": table_name,
}
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
```
# Docker compose
```
version: "3"
services:
# mysql:
# image: quay.io/debezium/example-mysql:2.1
# container_name: mysql
# ports:
# - "3306:3306"
# environment:
# MYSQL_ROOT_PASSWORD: debezium
# MYSQL_USER: mysqluser
# MYSQL_PASSWORD: mysqlpw
# restart: always
#
# fast-data-dev:
# image: dougdonohoe/fast-data-dev
# ports:
# - "3181:3181"
# - "3040:3040"
# - "7081:7081"
# - "7082:7082"
# - "7083:7083"
# - "7092:7092"
# - "8081:8081"
# environment:
# - ZK_PORT=3181
# - WEB_PORT=3040
# - REGISTRY_PORT=8081
# - REST_PORT=7082
# - CONNECT_PORT=7083
# - BROKER_PORT=7092
# - ADV_HOST=127.0.0.1
trino-coordinator:
image: 'trinodb/trino:428'
hostname: trino-coordinator
ports:
- '8080:8080'
volumes:
- ./trino/etc:/etc/trino
presto:
container_name: presto
ports:
- '8082:8082'
image: 'prestodb/presto:0.285'
volumes:
- ./presto/catalog:/opt/presto-server/etc/catalog
- ./presto/config.properties:/opt/presto-server/etc/config.properties
- ./presto/jvm.config:/opt/presto-server/etc/jvm.config
- ./presto/node.properties:/opt/presto-server/etc/node.properties
-
./presto/minio.properties:/usr/lib/presto/etc/catalog/minio.properties
metastore_db:
image: postgres:11
hostname: metastore_db
ports:
- 5432:5432
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
command: ["postgres", "-c", "wal_level=logical"]
healthcheck:
test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
interval: 10s
timeout: 5s
retries: 5
volumes:
- ./postgresscripts:/docker-entrypoint-initdb.d
hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"
minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
volumes:
hive-metastore-postgresql:
networks:
default:
name: hudi
```
catalog/hive.property
```
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
```
hudi.properties
```
connector.name=hudi
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.aws-access-key=admin
hive.s3.aws-secret-key=password
hive.s3.endpoint=http://minio:9000/
hive.s3.path-style-access=true
hive.s3.ssl.enabled=false
```
config.properties
```
#single node install config
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery-server.enabled=true
discovery.uri=http://localhost:8080
```
jvm.config
```
-server
-Xmx1G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+ExitOnOutOfMemoryError
-XX:ReservedCodeCacheSize=256M
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
```
node.properties
```
node.environment=docker
node.data-dir=/data/trino
plugin.dir=/usr/lib/trino/plugin
```

# Query o/p

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]