OK
This is the equivalent Python code
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when
from pyspark.sql.types import StructType, StructField, LongType
from datetime import datetime
spark = SparkSession.builder \
.master("local[*]") \
.appName("StreamingSparkPartitioned") \
.getOrCreate()
expression = when(expr("value % 3 = 1"), "stupid_event") \
.otherwise(when(expr("value % 3 = 2"),
"smart_event").otherwise("neutral_event"))
# Define the schema to match the rate-micro-batch data source
schema = StructType([StructField("timestamp", LongType()),
StructField("value", LongType())])
checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"
# Convert human-readable timestamp to Unix timestamp in milliseconds
start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)
streamingDF = spark.readStream \
.format("rate-micro-batch") \
.option("rowsPerBatch", "100") \
.option("startTimestamp", start_timestamp) \
.option("numPartitions", 1) \
.load() \
.withColumn("event_type", expression)
query = (
streamingDF.writeStream
.outputMode("append")
.format("console")
.trigger(processingTime="1 second")
.option("checkpointLocation", checkpoint_path)
.start()
)
query.awaitTermination()
This is the error I am getting
File "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py",
line 38, in <module>
query.awaitTermination()
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py",
line 201, in awaitTermination
File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
File
"/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED]
Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
value for offset
Did not find value which can be converted into long
Seems like there might be an issue with the *rate-micro-batch* source when
using the *startTimestamp* option.
You can try using socket source for testing purposes
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Sun, 28 Jan 2024 at 22:00, Perfect Stranger <paulpaul1...@gmail.com>
wrote:
> I described the issue here:
>
>
> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>
> Could someone please respond?
>
> The rate-micro-batch format doesn't seem to respect the startTimestamp
> option.
>
> Thanks.
>