Hello,
I have been trying to send JSON string data to AWS Firehose using PyFlink, but
because of Serialization issues, it is not able to parse JSON input properly.
This is the logic am using -
redis_output_str = """
{
"entities": [
{
"metadata": {
"entityType": ["KMAP"]
},
"entityName": "artificial intelligence",
"timestamp": 1747647770938
}
],
"is_entity_present": true,
"user_id": 935884520,
"timestamp": 1747647770938,
"status": "success"
}"""
redis_output = json.loads(redis_output_str)
entities_list = []
for entity in redis_output["entities"]:
metadata_row = Row(entity["metadata"]["entityType"])
entity_row = Row(metadata_row, entity["entityName"],
entity["timestamp"])
entities_list.append(entity_row)
firehose_input = Row(
entities_list,
redis_output["is_entity_present"],
redis_output["user_id"],
redis_output["timestamp"],
redis_output["status"],
)
## below not working so changed with above
# metadata_row = Row(
# entityType=entity["metadata"]["entityType"]
# ) # entityType: [str]
# print("metadata_row")
# print(metadata_row)
# print(type(metadata_row.entityType))
# print(type(metadata_row.entityType[0]))
# entity_row = Row(metadata_row, entity["entityName"],
entity["timestamp"])
# print("entity_row")
# print(entity_row)
# entities_list.append(entity_row)
# print("entities_list")
# print(entities_list)
# firehose_input = Row(
# entities=entities_list,
# is_entity_present=redis_output["is_entity_present"],
# user_id=redis_output["user_id"],
# timestamp=redis_output["timestamp"],
# status=redis_output["status"],
# )
print("firehose_input")
print(firehose_input)
# print(firehose_input.entities[0].metadata.entityType)
# print(type(firehose_input.entities[0].metadata.entityType))
# print(type(firehose_input.entities[0].metadata.entityType[0]))
# This structure MUST match the JSON you want to produce.
metadata_type_info_for_firehose = Types.ROW_NAMED(
["entityType"], [Types.LIST(Types.STRING())]
)
entity_type_info_for_firehose = Types.ROW_NAMED(
["metadata", "entityName", "timestamp"],
[metadata_type_info_for_firehose, Types.STRING(), Types.LONG()],
)
firehose_output_row_type = Types.ROW_NAMED(
["entities", "is_entity_present", "user_id", "timestamp", "status"],
[
Types.LIST(entity_type_info_for_firehose),
Types.BOOLEAN(),
Types.LONG(),
Types.LONG(),
Types.STRING(),
],
)
# Wrap with RowTypeInfo to expose .get_java_type_info()
firehose_output_row_type_info = RowTypeInfo(
firehose_output_row_type.get_field_types(),
firehose_output_row_type.get_field_names(),
)
print("firehose_output_row_type_info")
print(firehose_output_row_type_info)
# **NEW**: Create the JsonRowSerializationSchema for the Firehose sink
firehose_serialization_schema = (
JsonRowSerializationSchema.Builder()
.with_type_info(firehose_output_row_type_info)
.build()
)
# serialization_schema = firehose_serialization_schema._j_builder.build()
# Define Firehose sink properties
sink_properties = {"aws.region": "us-east-1"}
# Define Firehose sink
kdf_sink = (
KinesisFirehoseSink.builder()
.set_firehose_client_properties(sink_properties)
.set_serialization_schema(firehose_serialization_schema)
# .set_serialization_schema(json_serialization_schema)
# .set_serialization_schema(JsonSerializationSchema())
# .set_serialization_schema(SimpleStringSchema())
.set_delivery_stream_name(firehose_stream_name)
.set_fail_on_error(False)
.set_max_batch_size(200)
.set_max_in_flight_requests(100)
.set_max_buffered_requests(20000)
.set_max_batch_size_in_bytes(2 * 1024 * 1024)
.set_max_time_in_buffer_ms(200)
.set_max_record_size_in_bytes(1024 * 1024)
.build()
)
try:
env.from_collection(
collection=[firehose_input], type_info=firehose_output_row_type_info
).sink_to(kdf_sink)
env.execute(job_name)
But have been getting too many issues because of JsonRowSerializationSchema.
Is there a possible solution/suggestion from your end to fix the issues and
move ahead with it.
Looking forward to getting a response at your earliest convenience.
Regards,
Sanchay
________________________________
If you are not the intended recipient or have received this message in error,
please notify the sender and permanently delete this message and any
attachments.