Re: how to use cluster sparkSession like localSession

2018-11-01 Thread
I think you should investigate apache zeppelin and livy 崔苗(数据与人工智能产品开发部) <0049003...@znv.com>于2018年11月2日 周五11:01写道: > > Hi, > we want to execute spark code with out submit application.jar,like this > code: > > public static void main(String args[]) throws Exception{ > SparkSession spark =

Why SparkSQL changes the table owner when performing alter table opertations?

2018-03-12 Thread
Hi, When using spark.sql() to perform alter table operations I found that spark changes the table owner property to the execution user. Then I digged into the source code and found that in HiveClientImpl, the alterTable function will set the owner of table to the current execution user. Besides, s

spark.sql.adaptive.enabled has no effect

2018-01-30 Thread
Hi there, As far as I know, when *spark.sql.adaptive.enabled* is set to true, the number of post shuffle partitions should change with the map output size. But in my application there is a stage reading 900GB shuffled files only with 200 partitions (which is the default number of *spark.sql.shuf

Re: JDBC to hive batch use case in spark

2017-12-09 Thread
If you don't mind, I think it will help if you post your code Hokam Singh Chauhan 于2017年12月9日周六 下午8:02写道: > Hi, > I have an use case in which I wants to read data from a jdbc > source(Oracle) table and write it to hive table on periodic basis. I tried > this using the SQL context to read from Ora

How to kill a query job when using spark thrift-server?

2017-11-27 Thread
Hi, I intend to use spark thrift-server as a service to support concurrent sql queries. But in our situation we need a way to kill arbitrary query job, is there an api to use here?

[Structured Streaming] How to compute the difference between two rows of a streaming dataframe?

2017-09-29 Thread
Hi, I want to compute the difference between two rows in a streaming dataframe, is there a feasible API? May be some function like the window function *lag *in normal dataframe, but it seems that this function is unavailable in streaming dataframe. Thanks.

How to use approx_count_distinct to count distinct numbers in a day but output the count of each hour?

2017-09-20 Thread
Hi, I'm using approx_count_distinct to count unique visitors but encounter some problems. I want to count unique visitors in a day and output the results of every hour, and there is no duplicate counts between the results of two hours. Notice that it's not the cumulative distinct counts of the da

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread
state of dropDuplicates. But I cannot find a way to alleviate the problem. Michael Armbrust 于2017年9月15日周五 上午3:35写道: > How many UUIDs do you expect to have in a day? That is likely where all > the memory is being used. Does it work without that? > > On Tue, Sep 12, 2017 at 8:42 PM

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
dow", window(current_timestamp(), "15 minutes"))* throws no exception and works fine. I don't know if this is a problem that needs improvement. 张万新 于2017年9月13日周三 上午11:43写道: > and I use .withColumn("window", window(current_timestamp(), "15 > min

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count 张万新 于2017年9月13日周三 上午11:32写道: > *Yes, my code is shown below* > /** > * input > */ > val logs = spark > .readStream > .format("kafka") >

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread
nction is like* def parseFunction(str: String): (Long, String) = { val json = Json.parse(str) val timestamp = (json \ "time").get.toString().toLong val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 val uuid = (json \ "uuid").get.toString() (date, uuid

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
json to parse input logs from kafka ,the parse function is like* def parseFunction(str: String): (Long, String) = { val json = Json.parse(str) val timestamp = (json \ "time").get.toString().toLong val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 val uuid = (json

[SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread
Hi, I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the applica

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
The spark version is 2.2.0 Michael Armbrust 于2017年9月12日周二 下午12:32写道: > Which version of spark? > > On Mon, Sep 11, 2017 at 8:27 PM, 张万新 wrote: > >> Thanks for reply, but using this method I got an exception: >> >&

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread
于2017年9月12日周二 上午4:48写道: > import org.apache.spark.sql.functions._ > > df.withColumn("window", window(current_timestamp(), "15 minutes")) > > On Mon, Sep 11, 2017 at 3:03 AM, 张万新 wrote: > >> Hi, >> >> In structured streaming how can I a

[SS]How to add a column with custom system time?

2017-09-11 Thread
Hi, In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes? Thanks.

Will an input event older than watermark be dropped?

2017-09-06 Thread
Hi, I'm investigating the watermark for some time, according to the guide, if we specify a watermark on event time column, the watermark will be used to drop old state data. Then, take window-based count for example, if an event whose time is older than the watermark comes, it will be simply dropp

Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-04 Thread
way. > > On Fri, Sep 1, 2017 at 4:59 PM, 张万新 wrote: > >> Thanks, it's true that looser watermark can guarantee more data not be >> dropped, but at the same time more state need to be kept. I just consider >> if there is sth like kafka-partition-aware watermark in fli

Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-01 Thread
Thanks, it's true that looser watermark can guarantee more data not be dropped, but at the same time more state need to be kept. I just consider if there is sth like kafka-partition-aware watermark in flink in SS may be a better solution. Tathagata Das 于2017年8月31日周四 上午9:13写道: > Why not set the

Re: [Structured Streaming]Data processing and output trigger should be decoupled

2017-08-31 Thread
I think something like state store can be used to keep the intermediate data. For aggregations the engines keeps processing batches of data and update the results in state store(or sth like this), and when a trigger begins the engines just fetch the current result from state store and output it to

Re: Why do checkpoints work the way they do?

2017-08-30 Thread
So is there any documents demonstrating in what condition can my application recover from the same checkpoint and in what condition not? Tathagata Das 于2017年8月30日周三 下午1:20写道: > Hello, > > This is an unfortunate design on my part when I was building DStreams :) > > Fortunately, we learnt from our

Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread
Hi, I'm working with Structured Streaming to process logs from kafka and use watermark to handle late events. Currently the watermark is computed by (max event time seen by the engine - late threshold), and the same watermark is used for all partitions. But in production environment it happens fr

Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread
Hi, I'm working with Structured Streaming to process logs from kafka and use watermark to handle late events. Currently the watermark is computed by (max event time seen by the engine - late threshold), and the same watermark is used for all partitions. But in production environment it happens fr