Something like this?
# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random
# Third-party imports
import numpy as np
import pandas as pd
import pyarrow
# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
IntegerType, StringType, StructField, StructType,
DoubleType, TimestampType
)
from pyspark import pandas as ps
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
number_cores = int(multiprocessing.cpu_count())
mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")
# e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74
def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster("local[{}]".format(number_cores))
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.repl.eagerEval.enabled", "True"
).set("spark.sql.adaptive.enabled", "True").set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "10000"
).set(
"sc.setLogLevel", "ERROR"
)
return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("My super app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")
def generate_ip():
return ".".join(str(random.randint(0, 255)) for _ in range(4))
def generate_timestamp():
return pd.Timestamp(
year=random.randint(2021, 2023),
month=random.randint(1, 12),
day=random.randint(1, 28),
hour=random.randint(0, 23),
minute=random.randint(0, 59),
second=random.randint(0, 59)
)
def random_gbps():
return random.uniform(0, 10)
# Number of rows
n = 20
data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
"date_time": generate_timestamp()} for _ in range(n)]
df = spark.createDataFrame(pd.DataFrame(data))
df.show()
agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
windowRank = Window.orderBy(F.col("total_gbps").desc())
agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
result = df.join(top_80_ips, on="incoming_ips",
how="inner").select("incoming_ips", "gbps", "date_time")
result.show()
print(df.count())
print(result_df.count())
+---------------+-------------------+-------------------+
| incoming_ips| gbps| date_time|
+---------------+-------------------+-------------------+
| 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
| 155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
| 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
| 78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
| 241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
|130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
| 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
| 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
|218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
| 62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
| 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
| 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
| 210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
| 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06|
|180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
| 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30|
| 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
| 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
| 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
| 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
+---------------+-------------------+-------------------+
+---------------+------------------+-------------------+
| incoming_ips| gbps| date_time|
+---------------+------------------+-------------------+
| 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
| 241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
| 78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
|130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
| 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
| 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
|218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
| 62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
| 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
|180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
| 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06|
| 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30|
| 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
| 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
| 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
| 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
+---------------+------------------+-------------------+
20
16
fre. 15. sep. 2023 kl. 20:14 skrev [email protected]
<[email protected]>:
> Hi team,
>
> I am using PySpark 3.4
>
> I have a table of million rows that has few columns. among them incoming
> ips and what is known as gbps (Gigabytes per second) and date and time
> of incoming ip.
>
> I want to filter out 20% of low active ips and work on the remainder of
> data. How can I do thiis in PySpark?
>
> Thanks
>
>
>
>
>
--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge
+47 480 94 297