Thanks those suggestions helped
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use
d_streaming.scala:172:
trait TableSink takes type parameters
[error] val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
fieldDelim = ",")
[error] ^
[error] 6 errors found
May be I am not importing the correct dependencies.
Thanks
Dr Mich Talebzadeh
LinkedIn *
h
Hi Jorn,
Thanks I uploaded the Scala code to my GitHub --> md_streaming.scala
https://github.com/michTalebzadeh/Flink
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/v
uot;OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
HbaseTable.put(p)
HbaseTable.flushCommits()
However, I don't seem to be able to get the correct values for the columns!
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.co
if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
'price > 99.0))
{
sqltext = Calendar.getInstance.getTime.toString + ", Price
on "+ticker+" hit " +price.toString
DataStream[Row]
val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
What alt
This worked
streamExecEnv.setParallelism(2)
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disc
Hi,
In my test environment I have two task managers.
I would like to increase the parallelism to 2 from default of 1. Can it be
done through properties
properties.setProperty("parallelism", "2")
Although that does not change anything.
Thanks
Dr Mich Talebzadeh
Ok gents thanks for clarification.
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disc
[image: image.png]
I can verify that data being streamed in so there is no issue there.
However, I don't see any output and Flink GUI does not look healthy
(circles).
Appreciate any input.
Thanks,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=A
Thanks Fabian,
I looked at the maven and this is what it says *provided*
[image: image.png]
However, this jar file is not shipped with Flink? Is this deliberate?
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<ht
+= "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"
HTH
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
Apologies should read Vino and Timo
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:*
Thanks a lot Timo.
I will try the changes suggested.
Appreciated
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
Appreciate if anyone had a chance to look at the Scala code in GitHub and
advise
https://github.com/michTalebzadeh/Flink
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view
failed
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any a
Tremendous. Many thanks.
Put the sbt build file and the Scala code here
https://github.com/michTalebzadeh/Flink
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view
assLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wor
pache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"
library
7;price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
Thanks anyway.
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6
ncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
Completed compiling
which is really strange
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?
On Wed, 1 Aug 2018 at 10:03, Timo Walther wrote:
> If these two imports are the only imports that you added, then you did not
> follow Hequn's advice or the link that I sent you.
>
> You need to add the underscore imports to let Scala do its magic.
>
> Timo
>
>
>
or] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error] cannot be applied to
(org.apache.flink.streaming.
r] (compile:compileIncremental) Compilation failed
I suspect dataStream may not be compatible with this operation?
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view
leEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
The topic is very simple, it is comma separated prices. I tried mapFunction
and flatMa
Thanks
So the assumption is that one cannot perform split on DataStream[String]
directly?
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
dataStream.split(",")
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
What operation do I need to do on dataStream to make this split work?
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBx
dd("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
p.add("OPERATION".getBytes(), "
ction,
> filtering out records with a FilterFunction. You can also implement a
> FlatMapFunction to do both in one step.
>
> Once the stream is transformed and filtered, you can write it to HBase
> with a sink function.
>
>
> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>
a32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV
alent of
*for(line <- pricesRDD.collect.toArray)*
In flink?
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzade
Thanks, I'll check them out.
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Discl
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.
I don&
Hi Vino,
Many thanks.
I can confirm that Flink version flink-1.5.0-bin-hadoop28-scala_2.11 works
fine with Hadoop 3.1.0. I did not need to build it from source.
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<ht
Hi,
I can run Flink without bundled Hadoop fine. I was wondering if Flink with
Hadoop 2.8 works with Hadoop 3 as well?
Thanks,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view
Hi Deepak,
I will put it there once all the bits and pieces come together. At the
moment I am drawing the diagrams. I will let you know.
Definitely everyone's contribution is welcome.
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/vi
per). For Hbase I specified a zookeeper instance running on another
host and Hbase works fine.
Anyway I will provide further info and diagrams.
Cheers,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://
Thanks Jorn.
So I gather as you correctly suggested, microservices do provide value in
terms of modularisation. However, there will always "inevitably" be
scenarios where the receiving artefact say Flink needs communication
protocol changes?
thanks
Dr Mich Talebzadeh
LinkedI
properties))
So in summary some changes need to be made to Flink to be able to interact
with the new version of Kafka. And more importantly if one can use an
abstract notion of microservice here?
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxia
e for analytice.
HTH
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your ow
databases
like Aerospike <https://www.aerospike.com/> that the Keys are kept in the
memory and indexed and the values are stored on SSD devices. I was
wondering how Flink in general can address this?
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/v
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any a
needs to
ensure that your microbatching processing time is less that your batch
interval, i.e the rate that your producer sends data into Kafka. For
example this is shown in Spark GUI below for batch interval = 2 seconds
[image: image.png]
Is there such procedure in Flink please?
Thanks
Dr Mich Ta
yes indeed thanks. It is all working fine.
But only writing to a text file. I want to emulate what I do with Flink as
I do with Spark streaming writing high value events to Hbase on HDFS.
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id
-f403-4ea9-8d55-c3afd0eae110,BP,2018-07-03T09:50:10,506.4
23c07878-d64d-4d1e-84a4-c14c23357467,MKS,2018-07-03T09:50:10,473.06
kind regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view
ava:42)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
Dr Mich Talebzadeh
LinkedIn *
https://w
TE)
[error]^
[error] one error found
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV
named (1/1)
(bcb46879e709768c9160dd11e09ba05b).
2018-07-02 21:38:38,713 INFO
org.apache.flink.runtime.taskmanager.Task - Ensuring
all FileSystem streams are closed for task Source: Custom Source -> Sink:
Unnamed (1/1) (bcb46879e709768c9160d
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/vi
org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile
but did not work.
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:*
curity.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFronte
e: 0.586 s
[INFO] Finished at: 2018-07-02T03:44:32+01:00
[INFO] Final Memory: 26M/962M
[INFO]
So I guess my pom.xml is incorrect!
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd
e0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:613)
Any ideas?
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPC
]
Completed compiling
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
jar:1.5,
org.apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact
org.apache.flink:flink-java:jar:1.5 in central (
https://repo.maven.apache.org/maven2)
FYI I remove ~/.m2 directory to get rid of anything cached!
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.
^
[ERROR] 9 errors found
[INFO]
----
[INFO] BUILD FAILURE
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
/myPackage/md_streaming.scala:18:
error: not found: value StreamExecutionEnvironment
[INFO] val env = StreamExecutionEnvironment.getExecutionEnvironment
Is there a basic MVN pom file for flink? The default one from GitHub does
not seem to be working!
Thanks
Dr Mich Talebzadeh
LinkedIn
Thanks Rong
This worked.
$FLINK_HOME/bin/start-scala-shell.sh local --addclasspath
/home/hduser/jars/flink-connector-kafka-0.9_2.11-1.5.0.jar:/home/hduser/jars/flink-connector-kafka-base_2.11-1.5.0.jar
Regards,
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id
;Flink Kafka Example")
}
}
warning: there was one deprecation warning; re-run with -deprecation for
details
defined object Main
But I do not see any streaming output.
A naïve question. How do I execute the above compiled object in this shell?
Thanks
Dr Mich Talebzadeh
LinkedIn
meters to* start-scala-shell.sh local*
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disc
and downloaded flink built with Hadoop 2.8 and that worked
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
ram
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result.
unfortunately I am getting the same error.
Cheers
Dr Mich Talebzadeh
Linke
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
Appreciate any suggestions.
Thanks
Dr Mich Talebzadeh
LinkedIn *
h
Thanks Chiwan. It worked.
Now I have this simple streaming program in Spark Scala that gets streaming
data via Kafka. It is pretty simple. Please see attached.
I am trying to make it work with Flink + Kafka
Any hints will be appreciated.
Thanks
Dr Mich Talebzadeh
LinkedIn *
https
rs.kafka
Any ideas will be appreciated
^
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8P
Hi,
I just found out it does
Scala-Flink>
thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.
guide
https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html
2) Does Flink support Scala language or what is its default programming
language if any.
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id
68 matches
Mail list logo