for performing your task of converting to
json and more.
Thus performance wise, you can group batch per user records and also share
the same connection in each partition if needed.
Hope this will help.
Regards
Amit
On Wed, 26 Apr, 2023, 15:58 Marco Costantini, <
marco.costant...@rocketfncl.
Hi Team,
I am running a query in Spark 3.2.
val df1 =
sc.parallelize(List((1,2,3,4,5),(1,2,3,4,5))).toDF("id","col2","col3","col4",
"col5")
val op_cols_same_case = List("id","col2","col3","col4", "col5", "id")
val df2 = df1.select(op_cols_same_case .head, op_cols_same_case .tail: _*)
df2.select("
.
Thanks
Amit
On Wed, Sep 14, 2022 at 8:14 PM Sid wrote:
> Hello experts,
>
> I know that Gzip and snappy files are not splittable i.e data won't be
> distributed into multiple blocks rather it would try to load the data in a
> single partition/block
>
> So, my question i
https://github.com/allwefantasy/spark-binlog
Sent from my iPhone
> On 19 Aug 2022, at 5:45 PM, sandra sukumaran
> wrote:
>
>
> Dear Sir,
>
>
>
> Is there any possible method to fetch MySQL database bin log, with the
> help of spark streaming.
> Kafka streaming is not applicable
should be all
possible values which you used to create salt.
I hope you understand.
Thanks
On Sun, Jul 31, 2022 at 10:02 AM Sid wrote:
> Hi Amit,
>
> Thanks for your reply. However, your answer doesn't seem different from
> what I have explained.
>
> My question is af
values of the sale.
join_col
x1_1
x1_2
x2_1
x2_2
And then join it like
table1.join(table2, where tableA.new_join_col == tableB. join_col)
Let me know if you have any questions.
Regards
Amit Joshi
On Sat, Jul 30, 2022 at 7:16 PM Sid wrote:
> Hi Team,
>
> I was trying to understand th
Thanks Sean/Martin, my bad, Spark version was 3.0.1 so after using json
3.6.6 it fixed the issue.
Thanks
Amit
On Fri, Feb 4, 2022 at 3:37 PM Sean Owen wrote:
> My guess is that something else you depend on is actually bringing in a
> different json4s, or you're otherwise mixing li
Martin Sean, changed it to 3.7.0-MS still getting the below error.
I am still getting the same issue
Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError:
org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;
Thanks
Amit
On Fri, Feb 4, 2022 at 9:03
Hello, everyone. I am migrating my spark stream to spark version 3.1. I
also upgraded json version as below
libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5"
While running the job I getting an error for the below code where I am
serializing the given inputs.
implicit val for
Thanks Mich. The link you shared have two options Kafka and Socket only.
Thanks
Amit
On Sat, Jan 29, 2022 at 3:49 AM Mich Talebzadeh
wrote:
> So you have a classic architecture with spark receiving events through a
> kafka topic via kafka-spark-connector, do something with it and sen
Hello everyone, we have spark streaming application. We send request to
stream through Akka actor using Kafka topic. We wait for response as it is
real time. Just want a suggestion is there any better option like Livy
where we can send and receive request to spark streaming.
Thanks
Amit
))
Please suggest. it has blocked my production release.
Thanks
Amit
Amit
name := """cft-common"""
ThisBuild / version := "0.0.1-SNAPSHOT"
ThisBuild/scalaVersion := "2.12.15"
Test / fork := true
Test / envVars := Map("env" -> "qa")
val jacksonVersion = "2.13.1"
val AkkaVe
tion: com.codahale.metrics.JmxReporter
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
On Thu, Jan 20, 2022 at 5:1
ava:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
Thanks
Amit
lass
any idea how to resolve these.
Thanks
Amit
Hi spark users,
Can anyone please provide any views on the topic.
Regards
Amit Joshi
On Sunday, October 3, 2021, Amit Joshi wrote:
> Hi Spark-Users,
>
> Hope you are doing good.
>
> I have been working on cases where a dataframe is joined with more than
> one data fr
uot;key2")
I was thinking of bucketing as a solution to speed up the joins. But if I
bucket df1 on the key1,then join2 may not benefit, and vice versa (if
bucket on key2 for df1).
or Should we bucket df1 twice, one with key1 and another with key2?
Is there a strategy to make both the joins faster for both the joins?
Regards
Amit Joshi
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Can anyone help here and let me know about why it happened and what is
resolution for this.
--
Thanks & Regards,
Amit Sharma
nks
Amit
HI Mich,
Thanks for your email.
I have tried for the batch mode,
Still looking to try in streaming mode.
Will update you as per.
Regards
Amit Joshi
On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh
wrote:
> OK let us start with the basic cube
>
> create a DF first
>
> scal
s as well?
I hope I was able to make my point clear.
Regards
Amit Joshi
On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh
wrote:
>
>
> Hi,
>
> Just to clarify
>
> Are we talking about* rollup* as a subset of a cube that computes
> hierarchical subtotals from left to ri
Appreciate if someone could give some pointers in the question below.
-- Forwarded message -
From: Amit Joshi
Date: Tue, Jun 15, 2021 at 12:19 PM
Subject: [Spark]Does Rollups work with spark structured streaming with
state.
To: spark-user
Hi Spark-Users,
Hope you are all
saved.
If rollups are not supported, then what is the standard way to handle this?
Regards
Amit Joshi
provide enough memory in
the spark cluster to run both.
Regards
Amit Joshi
On Sat, May 22, 2021 at 5:41 AM wrote:
> Hi Amit;
>
>
>
> Thank you for your prompt reply and kind help. Wonder how to set the
> scheduler to FAIR mode in python. Following code seems to me does not work
&
Hi Jian,
You have to use same spark session to run all the queries.
And use the following to wait for termination.
q1 = writestream.start
q2 = writstream2.start
spark.streams.awaitAnyTermination
And also set the scheduler in the spark config to FAIR scheduler.
Regards
Amit Joshi
On
worker nodes.
Use this command to pass it to driver
*--files /appl/common/ftp/conf.json --conf
spark.driver.extraJavaOptions="-Dconfig.file=conf.json*
And make sure you are able to access the file location from worker nodes.
Regards
Amit Joshi
On Sat, May 15, 2021 at 5:14 AM KhajaAsmath Moh
ope this helps.
Regards
Amit Joshi
On Wed, Apr 7, 2021 at 8:14 PM Mich Talebzadeh
wrote:
>
> Did some tests. The concern is SSS job running under YARN
>
>
> *Scenario 1)* use spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>- Removed spark-sql-kafka-0-10_2.12-3.1.0.jar from any
Hi, can we write unit tests for spark code. Is there any specific framework?
Thanks
Amit
Hi Boris,
Thanks for your code block.
I understood what you are trying to achieve in the code.
But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be
done on it.
Regards
Amit
On Monday, January 18, 2021, Boris
Hi Boris,
I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution?
Regards
Amit
On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak wrote:
> Hi Amit,
>
>
>
> Why won’t you ju
is approach is fine? Specifically
if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
I believe it’s a spark Ui issue which do not display correct value. I
believe it is resolved for spark 3.0.
Thanks
Amit
On Fri, Jan 8, 2021 at 4:00 PM Luca Canali wrote:
> You report 'Storage Memory': 3.3TB/ 598.5 GB -> The first number is the
> memory used for storage, th
Hi All,
Can someone pls hellp with this.
Thanks
On Tuesday, December 8, 2020, Amit Joshi wrote:
> Hi Gabor,
>
> Pls find the logs attached. These are truncated logs.
>
> Command used :
> spark-submit --verbose --packages org.apache.spark:spark-sql-
> kafka-0-10_2.12:3.0.
-cores 1 --executor-memory 4g --files
gs://x/jars_application.conf,gs://x/log4j.properties
gs://x/a-synch-r-1.0-SNAPSHOT.jar
For this I used a snapshot jar, not a fat jar.
Regards
Amit
On Mon, Dec 7, 2020 at 10:15 PM Gabor Somogyi
wrote:
> Well, I can't do miracle without cluster and log
any suggestion please.
Thanks
Amit
On Fri, Dec 4, 2020 at 2:27 PM Amit Sharma wrote:
> Is there any memory leak in spark 2.3.3 version as mentioned in below
> Jira.
> https://issues.apache.org/jira/browse/SPARK-29055.
>
> Please let me know how to solve it.
>
> Thanks
>
Jayesh, but during logical plan spark would be knowing to use the same DF
twice so it will optimize the query.
Thanks
Amit
On Mon, Dec 7, 2020 at 1:16 PM Lalwani, Jayesh wrote:
> Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2,
> without caching, Spark will re
Sean, you mean if df is used more than once in transformation then use
cache. But be frankly that is also not true because at many places even if
df is used once with caching and without cache also it gives same result.
How to decide should we use cache or not
Thanks
Amit
On Mon, Dec 7, 2020
dataframe multiple times for transformation ,
use caching should be based on actions. In my case action is one save call
on DF3. Please correct me if i am wrong.
Thanks
Amit
On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
theo.gkountou...@futurewei.com> wrote:
> Hi Amit,
>
>
Amit
Hi Gabor,
The code is very simple Kafka consumption of data.
I guess, it may be the cluster.
Can you please point out the possible problem toook for in the cluster?
Regards
Amit
On Monday, December 7, 2020, Gabor Somogyi
wrote:
> + Adding back user list.
>
> I've had a look at
s that you are overriding the kafka-clients that comes
>> with spark-sql-kafka-0-10_2.12
>>
>>
>> I'd try removing the kafka-clients and see if it works
>>
>>
>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi
>> wrote:
>>
>>> Hi All,
fault
value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
I have tried setting up the "partition.assignment.strategy", then also
its not working.
Please help.
Regards
Amit Joshi
Is there any memory leak in spark 2.3.3 version as mentioned in below Jira.
https://issues.apache.org/jira/browse/SPARK-29055.
Please let me know how to solve it.
Thanks
Amit
On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma wrote:
> Can someone help me on this please.
>
>
> Thanks
>
Can someone help me on this please.
Thanks
Amit
On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma wrote:
> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
> there is a Storage Memory column. It displays used memory /total memory.
> What is used memory. Is it memor
Hi , I have a spark streaming job. When I am checking the Excetors tab ,
there is a Storage Memory column. It displays used memory /total memory.
What is used memory. Is it memory in use or memory used so far. How would
I know how much memory is unused at 1 point of time.
Thanks
Amit
please find attached the screenshot of no active task but memory i still
used .
[image: image.png]
On Sat, Nov 21, 2020 at 4:25 PM Amit Sharma wrote:
> I am using df.cache and also unpersisting it. But when I check spark Ui
> storage I still see cache memory usage. Do I need to do any
usages should be 0.
Thanks
Amit
Russell i increased the rpc timeout to 240 seconds but i am still getting
this issue once a while and after this issue my spark streaming job stuck
and do not process any request then i need to restart this every time. Any
suggestion please.
Thanks
Amit
On Wed, Nov 18, 2020 at 12:05 PM Amit
please help.
Thanks
Amit
On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma wrote:
> Please find below the exact exception
>
> Exception in thread "streaming-job-executor-3" java.lang.OutOfMemoryError:
> Java heap space
> at java.util.Arrays.copyOf(Array
Please help.
Thanks
Amit
On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma wrote:
> Hi, we are running a spark streaming job and sometimes it throws below
> two exceptions . I am not understanding what is the difference between
> these two exception for one timeout is 120 seconds and a
Hi, we are running a spark streaming job and sometimes it throws below two
exceptions . I am not understanding what is the difference between these
two exception for one timeout is 120 seconds and another is 600 seconds.
What could be the reason for these
Error running job streaming job 160570
am caching some RDD inside a method and uncaching
it.
2. Similarly on Executor tab it display 'Storage Memory' used and
available, is that used means currently in use or memory used on that
executor at some point of time (maximum memory used so far)
Thanks
Amit
nfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma wrote:
> Hi , I am using 16 nodes spark cluster with below config
> 1. Executor memory 8 GB
> 2. 5 cores per executor
Can you please help.
Thanks
Amit
On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma wrote:
> Hi , I am using 16 nodes spark cluster with below config
> 1. Executor memory 8 GB
> 2. 5 cores per executor
> 3. Driver memory 12 GB.
>
>
> We have streaming job. We do not see problem
receive a request and process it but suddenly it’s start
giving out of memory error . It will throw exception for 1 executor then
throw for other executor also and it stop processing the request.
Thanks
Amit
Hi, i have a question while we are reading from cassandra should we use
partition key only in where clause from performance perspective or it does
not matter from spark perspective because it always allows filtering.
Thanks
Amit
Can you pls post the schema of both the tables.
On Wednesday, September 30, 2020, Lakshmi Nivedita
wrote:
> Thank you for the clarification.I would like to how can I proceed for
> this kind of scenario in pyspark
>
> I have a scenario subtracting the total number of days with the number of
> ho
x27;s eventual consistence.
Regards
Amit
On Sunday, September 27, 2020, Debabrata Ghosh
wrote:
> Hi,
> I had a query around Spark checkpoints - Can I store the checkpoints
> in NoSQL or Kafka instead of Filesystem ?
>
> Regards,
>
> Debu
>
possible this may not be the
issue. You can check at your end the same problem.
https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219
And can you pls tell what issue was solved in spark 3, which you are
referring.
Regards
Amit
.
Regards,
Amit
On Thursday, September 17, 2020, Rishi Shah
wrote:
> Hello All,
>
> Hope this email finds you well. I have a dataframe of size 8TB (parquet
> snappy compressed), however I group it by a column and get a much smaller
> aggregated dataframe of size 700 rows (just two columns
Hi,
There is other option like apache Livy which lets you submit the job using
Rest api.
Other option can be using AWS Datapipeline to configure your job as EMR
activity.
To activate pipeline, you need console or a program.
Regards
Amit
On Thursday, September 3, 2020, Eric Beabes
wrote
Hi Jungtaek,
Thanks for the input. I did tried and it worked.
I got confused earlier after reading some blogs.
Regards
Amit
On Friday, August 28, 2020, Jungtaek Lim
wrote:
> Hi Amit,
>
> if I remember correctly, you don't need to restart the query to reflect
> the newl
Any pointers will be appreciated.
On Thursday, August 27, 2020, Amit Joshi wrote:
> Hi All,
>
> I am trying to understand the effect of adding topics and partitions to a
> topic in kafka, which is being consumed by spark structured streaming
> applications.
>
> Do we have
spark structured streaming application to read
from the newly added partition to a topic?
Kafka consumers have a meta data refresh property that works without
restarting.
Thanks advance.
Regards
Amit Joshi
executors or it is
independent ?
Thanks
Amit
Can you keep option field in your case class.
Thanks
Amit
On Thu, Aug 13, 2020 at 12:47 PM manjay kumar
wrote:
> Hi ,
>
> I have a use case,
>
> where i need to merge three data set and build one where ever data is
> available.
>
> And my dataset is a complex objec
Hi,
I have a scenario where a kafka topic is being written with different types
of json records.
I have to regroup the records based on the type and then fetch the schema
and parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStream
Any help is appreciated. I have spark batch job based on condition I would
like to start another batch job by invoking .sh file. Just want to know can
we achieve that?
Thanks
Amit
On Fri, Aug 7, 2020 at 3:58 PM Amit Sharma wrote:
> Hi, I want to write a batch job which would call another ba
Hi, I want to write a batch job which would call another batch job based on
condition. Can I call one batch job through another in scala or I can do it
just by python script. Example would be really helpful.
Thanks
Amit
nk doesn't support multiple writers.
It assumes there is only one writer writing to the path. Each query needs
to use its own output directory.
Is there a way to write the output to the same path by both queries, as I
need the output at the same path.?
Regards
Amit Joshi
"empl" => emplSchema
}
getGenericInternalRow(schema)
}
val data = udf(getData)
Spark Version : 2.4.5
Please Help.
Regards
Amit Joshi
.
Please let me know if there is any way to use spark batch job to copy data
among two cassandra clusters.
Thanks
Amit
Hi All, sometimes i get this error in spark logs. I notice few executors
are shown as dead in the executor tab during this error. Although my job
get success. Please help me out the root cause of this issue. I have 3
workers with 30 cores each and 64 GB RAM each. My job uses 3 cores per
executor an
Please help on this.
Thanks
Amit
On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma wrote:
> Hi All, i am running the same batch job in my two separate spark clusters.
> In one of the clusters it is showing GC warning on spark -ui under
> executer tag. Garbage collection is taking lo
Please help on this.
Thanks
Amit
On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma wrote:
> Hi, sometimes my spark streaming job throw this exception Futures timed
> out after [300 seconds].
> I am not sure where is the default timeout configuration. Can i increase
> it.
spark submit and using G1GC .
Please let me know what could be the reason for GC slowness.
Thanks
Amit
Hi, sometimes my spark streaming job throw this exception Futures timed
out after [300 seconds].
I am not sure where is the default timeout configuration. Can i increase
it. Please help.
Thanks
Amit
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[300 seconds
Hi, I have to delete certain raw from Cassandra during my spark batch
process. Is there any way to delete Rawat using spark Cassandra connector.
Thanks
Amit
table at step 2 I will lose the data in
DF1 as it shows empty.
I have two solutions
1. Store the DF1 in another temp table before truncating table A
2. Cache DF1 before truncating.
Do we have any better solution ?
Thanks
Amit
unsubscribe
Unsubscribe
I have set 5 cores per executor. Is there any formula to determine best
combination of executor and cores and memory per core for better
performance. Also when I am running local spark instance in my web jar
getting better speed than running in cluster.
Thanks
Amit
Hi Team,
I am trying to use regexp_replace in spark sql it throwing error
expected , but found Scalar
in 'reader', line 9, column 45:
... select translate(payload, '"', '"') as payload
i am trying to remove all character from \\\" with "
Please update me if any one knows about it.
Thanks
Amit
On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma wrote:
> Hi , we have spark streaming job to which we send a request through our UI
> using kafka. It process and returned the response. We are getting below
> error and this staremi
Hi Team,
I have kafka messages where json is coming as string how can create table
after converting json string to json using spark sql
Hi , we have spark streaming job to which we send a request through our UI
using kafka. It process and returned the response. We are getting below
error and this stareming is not processing any request.
Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key
Thanks Andrew but I am asking specific to driver memory not about executors
memory. We have just one master and if each jobs driver.memory=4g and
master nodes total memory is 16gb then we can not execute more than 4 jobs
at a time.
On Monday, October 7, 2019, Andrew Melo wrote:
> Hi Amit
>
.
Thanks
Amit
Increasing your driver memory as 12g.
On Thursday, August 15, 2019, Dennis Suhari
wrote:
> Hi community,
>
> I am using Spark on Yarn. When submiting a job after a long time I get an
> error mesage and retry.
>
> It happens when I want to store the dataframe to a table.
>
> spark_df.write.option
.
Thanks
Amit
.
(org.apache.spark.scheduler.AsyncEventQueue).
Please suggest what how to debug this issue.
Thanks
Amit
-node 14=1 like that. Is there any conf property i need to
change. I know with dynamic allocation we can use below but without dynamic
allocation is there any?
--conf "spark.dynamicAllocation.maxExecutors=2"
Thanks
Amit
please update me if any one knows how to handle it.
On Sun, Jul 21, 2019 at 7:18 PM Amit Sharma wrote:
> Hi , I wrote a code in future block which read data from dataset and cache
> it which is used later in the code. I faced a issue that data.cached() data
> will be replaced by c
Hi , I wrote a code in future block which read data from dataset and cache
it which is used later in the code. I faced a issue that data.cached() data
will be replaced by concurrent running thread . Is there any way we can
avoid this condition.
val dailyData = callDetailsDS.collect.toList
val adju
Do you have dynamic resource allocation enabled?
On Wednesday, July 17, 2019, zenglong chen wrote:
> Hi,all,
> My standalone mode has two slaves.When I submit my job,the
> localhost slave is working well,but second slave do add and remove executor
> action always!The log are below:
>
Hi All, i have set the dynamic allocation propertt = true in my script file
and also shuffle property in script as well as on all worker nodes
spark-env file. I am using spark kafka streaming. I checked that as request
comes no of cores allocation increase but even after request is completed
no of
under Executor tab i saw all the nodes there but do not see active
tasks while task status is active. Please help me to find the root cause.
Thanks
Amit
,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([techops-prod2,4],
[techops-prod2,0]))
Please let me know do we missed any setting so that streaming should not
stopped even if couple of Kafka nodes are down.
Thanks
Amit
Please provide update if any one knows.
On Monday, June 10, 2019, Amit Sharma wrote:
>
> We have spark kafka sreaming job running on standalone spark cluster. We
> have below kafka architecture
>
> 1. Two cluster running on two data centers.
> 2. There is LTM on top on each
1 - 100 of 251 matches
Mail list logo