og
> to see more details about the first failed checkpoint.
>
> On 17 Mar 2022, at 9:41 AM, Vijayendra Yadav
> wrote:
>
>
> Hi Flink Team,
>
> I am using Flink 1.11 with kinsisesis consumer and s3 file streaming write
> with s3 checkpoint backend. This is streaming ser
t; is the problem here, is there an Exception you can share?
>
> Thanks,
>
>
>> On Wed, Feb 23, 2022 at 11:25 AM Vijayendra Yadav
>> wrote:
>> Hi Team,
>>
>> I am running flink 1.11 kinesis consumer with say N kinesis shards, but i
>> want to incre
Hi Team,
I am running flink 1.11 kinesis consumer with say N kinesis shards, but i want
to increase/decrease shards to N+M or N-M.
Once i do that my flink consumer need to be restarted with changed parallelism.
But i am unable to restart from existing checkpoint because of change in number
of s
Hi Team,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.html
*" There is no perfect generic default assignment function. Default shard
to subtask assignment, which is based on hash code, may result in skew,
kinesis/FlinkKinesisConsumer.java#L158-L159
>
> Best,
> Yun Tang
> From: Vijayendra Yadav
> Sent: Wednesday, June 23, 2021 11:02
> To: user
> Subject: High Flink checkpoint Size
>
> Hi Team,
>
> I have two flink Streaming Jobs
> 1) Flink streaming from
Hi Team,
I have two flink Streaming Jobs
1) Flink streaming from KAFKA and writing to s3
2) Fling Streaming from KINESIS (KDS) and writing to s3
Both Jobs have similar checkpoint duration.
Job #1 (KAFKA) checkpoint size is only 85KB
Job #2 (KINESIS) checkpoint size is 18MB
There are no checkpoi
Hi Team,
I am using streaming file sink and sinking data to s3. Also, using Graphite
exporter for metrics.
I can see correct counters for Source, Map, Filter functions. But for SINK,
only* numRecordsIn* is populating, I am looking to get *numRecordsOut *counts
also, but it's always staying 0 alth
gt; through the respective ./plugins/ directory as described in the docs [1].
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/
>
>> On Fri, May 28, 2021 at 7:07 AM Vijayendra Yadav
>> wro
Hi Team,
I am trying to find an alternate way to set the plugin directory (not by
manually creating it in the flink library), maybe passing them through the
dependency Jar in App classpath ?
Plugin directory containing:
1) flink-metrics-graphite-1.11.0.jar
2) flink-s3-fs-hadoop-1.11.0.jar
Than
.
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20681
> [2] https://issues.apache.org/jira/browse/FLINK-20811
> [3] https://issues.apache.org/jira/browse/FLINK-20867
>
> On Thu, May 27, 2021 at 12:23 AM Vijayendra Yadav
>
projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>
> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav
> wrote:
>
>> Hi Piotr,
>>
>> I have been doing the same process as you mentioned so far, now I am
>> migrating the deployment proce
imply download those things (whole directory containing
> those) to the machine that will be starting the Flink job?
>
> Best, Piotrek
>
> wt., 25 maj 2021 o 07:50 Vijayendra Yadav
> napisał(a):
>
>> Hi Team,
>>
>> I am trying to find a way to ship files from a
Hi Team,
I am trying to find a way to ship files from aws s3 for a flink streaming job,
I am running on AWS EMR. What i need to ship are following:
1) application jar
2) application property file
3) custom flink-conf.yaml
4) log4j application specific
Please let me know options.
Thanks,
Vijay
Hi Team,
While restarting Flink application from CHECKPOINT, facing the following
Error(intermittently), but it does not impact Job getting submitted or
functionality. But still wondering what could be the reason and solution ?
*RUN Command:*
/usr/lib/flink/bin/flink run
\
-s
*s3://bucket-app
state).
>
> If there is any failure and Flink restarts automatically, it will always pick
> up from the latest checkpoint [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery
>
>> On Thu, Apr 8,
Thanks it was working fine with: bin/flink run -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\
On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav
wrote:
> Hi Arvid,
>
> Thanks for your response. I did not restart from the checkpoint. I assumed
>
em here?
> - How long did you wait with recovery?
>
>
>
> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>> We are trying to make sure we are not losing data when KINESIS Consumer
>> is down.
>>
>> Kinesi
Hi Team,
We are trying to make sure we are not losing data when KINESIS Consumer is
down.
Kinesis streaming Job which has following checkpointing properties:
*// checkpoint every X msecs
env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
*// enable externalized checkpoints whi
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>
> On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>> Could you provide a sample
Hi Team,
Could you provide a sample how to pass Flink Datastream Source and sink
results to increment COUNTER and then I want to display the Counter in
Local IDE.
Counter to display for #1 through #3.
1) DataStream messageStream = env.addSource(Kinesis Source);
2) DataStream outputStream =
messag
Hi Smile,
Thanks for your clarification, it helped.
Thanks,
Vijay
> On Feb 28, 2021, at 7:06 PM, Smile wrote:
>
> Hi Vijay,
>
> Since version 1.7 Flink builds with Scala version 2.11 (default) and 2.12.
> Flink has APIs, libraries, and runtime modules written in Scala. Users of
> the Scala A
Hi Team,
While running java flink project in local, I am facing following issues: *Could
not create actor system ; Caused by: java.lang.NoSuchMethodError:
scala.Product.$init$(Lscala/Product;)V*
Could you suggest does flink java project needs scala at run time? What
versions might be incompatible
on't think anyone was aware that the plugin directory is not populated
> on EMR.
>
> On 10/27/2020 9:53 PM, Vijayendra Yadav wrote:
>
> Perfect after downloading it into the plugin, it is working well. I am
> wondering why these jars have been removed from opt/ folder, earli
rectory.
>
> On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:
>
> Also, you are right that the plugin did not have anything by default when
> we created EMR 5.31 with Flink 1.11.
>
> In opt/ I see:
>
> [hadoop@ip-10-223-71-70 flink]$ pwd
> /usr/lib/flink
> [hadoop@ip-
Vijayendra Yadav
wrote:
> Hi Chesnay,
>
> Steps to upgrade are as follows:
>
> 1) Created EMR 5.31 Cluster which comes with Flink 1.11
> 2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for application.
>
>cd /usr/lib/flink/
>
> mkdir -p ./plugins/s
isting 1.10 distribution?
>
> The configuration is correct, but it appears as if the entire plugins
> directory is either a) empty or b) not shipped.
>
> On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:
>
> Hi Robert and Chesnay,
>
> Only thing changed is I upgraded from
INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter
configured, no metrics will be exposed/reported.
Regards,
Vijay
On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav
wrote:
> Hi Chesnay,
>
> I have the same, and I am exporting the flinkconf like below, whe
ay Schepler wrote:
>
> Have you followed the documentation, specifically this bit?
>
> > In order to use this reporter you must copy
> /opt/flink-metrics-influxdb-1.11.2.jar into the plugins/influxdb folder
> of your Flink distribution.
>
> On 10/24/2020 12:17 AM, V
Hi Team,
for Flink 1.11 Graphite Metrics. I see the following Error in the log.
Any suggestions?
020-10-23 21:55:14,652 ERROR
org.apache.flink.runtime.metrics.ReporterSetup- Could
not instantiate metrics reporter grph. Metrics might not be
exposed/reported.
java.lang.ClassNotFound
ps://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html
> )
> [3] Refined fallback filesystems to only handle specific filesystems (
> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.11.html
> )
>
> śr., 14 paź 2020 o 06
ich might be causing
> this problem for you. Other than that, if you are already using Flink
> 1.10.1 (or newer), maybe please double check what class are you extending?
> The example provided by Ravi seems to be working for me.
>
> Piotrek
>
> wt., 13 paź 2020 o 19:02 Vijayendra
Hi Team,
I have upgraded to Flink 1.11 (EMR 5.31) from Flink 1.10 (5.30.1).
I am facing following Error while running *flink streaming *Job from
command line.
run command like:*/usr/lib/flink/bin/flink run*
*What dependency I might be missing or conflicting ?*
gt;ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))
> .withPartSuffixFunction(()=> ".ext")
>
>
> Regards,
> Ravi
>
> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav
> w
Hi Team,
I have tried to assign a dynamic prefix for file name, which contains
datetime components.
*The Problem is Job always takes initial datetime when job first starts and
never refreshes later. *
*How can I get dynamic current datetime in filename at sink time ?*
*.withPartPrefix
(ZonedDateT
Thank You Dawid.
Sent from my iPhone
> On Sep 7, 2020, at 9:03 AM, Dawid Wysakowicz wrote:
>
Hi Team,
I have a generic Question.
Let's say I have 2 Actions to be taken on Flink DATASTREAM (Kafka).
1) Convert some data fields, and write to external Database
2) Transform #1 converted data fields in to different record format say AVRO
*Here are Two approaches that are possible.*
a) One Ma
y tried this with the latest
> Flink version?
> Also including Klou in this email, who might be able to confirm this.
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-17632
>
> On Fri, Sep 4, 2020 at 2:02 AM Vijayendra Yadav
> wrote:
>
>
Hi Team,
Is there any feature to be able to ship directory to containers from s3
Directory instead of local.
-yt,--yarnship Ship files in the specified directory
(t for transfer)
Wed, Sep 2, 2020 at 3:34 AM Vijayendra Yadav
> wrote:
>
>> Thanks all, I could see the metrics.
>>
>> On Thu, Aug 27, 2020 at 7:51 AM Robert Metzger
>> wrote:
>>
>>> I don't think these error messages give us a hint why you can't see the
>&g
rd would be time of the longest blocking operation.
>
> Cheers,
> Till
>
>> On Wed, Sep 2, 2020 at 2:54 AM Vijayendra Yadav
>> wrote:
>> Hi Team,
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-
he right configuration parameters for Flink
> 1.10? That all required JARs are in the lib/ folder (on all machines) and
> that your graphite setup is working (have you confirmed that you can show
> any metrics in the Graphite UI (maybe from a Graphite demo thingy))?
>
>
> On Thu,
Hi Team,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
*Flink chaining my Tasks which is like: stream.map().filter().map() *
*I think here the entire chain runs in the same slot.*
*Documentation says flink does chahining for bet
t; The system will just waste resources to process the checkpoints.
>
> Best,
> Andrey
>
>> On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav
>> wrote:
>> Hi Andrey,
>>
>> Thanks,
>> what is recommendation for :
>> env.getCheckpointConf
Please ignore, last Email, Its working now by adding more parallelism.
On Fri, Aug 28, 2020 at 12:39 PM Vijayendra Yadav
wrote:
> Hi Team,
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#configuring-taskmanager-processing-slots
>
>
> In
hen checkpoint occurs to provide exactly once guarantee.
>
> Best,
> Andrey
>
> On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls
>> (ONLY) on every checkpoi
Hi Team,
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#configuring-taskmanager-processing-slots
In my Flinkkafkaconsumer streaming process. I have kafka with 3
partitions.
I am getting parallelism = 3 (-p 3). Here 3 taskmangers are launched but
each taskmanager g
Hi Team,
For regular unbounded streaming application streaming through kafka,
which does not use any event time or window operations, does setting
watermark strategy for kafkaconsumer (connector) help us in any way like
performance ?
Regards,
Vijay
ass:
> org.apache.flink.metrics.graphite.GraphiteReporter
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter
>
> On 26/08/2020 16:40, Vijayendra Yadav wrote:
>
> Hi Dawid,
>
> I have 1.10.0 vers
ssues.apache.org/jira/browse/FLINK-16965
>
> On 26/08/2020 08:04, Vijayendra Yadav wrote:
>> Hi Nikola,
>>
>> To rule out any other cluster issues, I have tried it in my local now. Steps
>> as follows, but don't see any metrics yet.
>>
>> 1) Set up
en initialized with the config you have pasted.
>
> Regards
> ,
> Nikola Hrusov
>
>
> On Mon, Aug 24, 2020 at 5:00 AM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>> I am trying to export Flink stream default metrics using Graphite, but I
>> c
THe default carbon port is 2003 and if you use the aggregator it is
> 2023.
>
> You should be able to see in both flink jobmanager and taskmanager that the
> metrics have been initialized with the config you have pasted.
>
> Regards,
> Nikola Hrusov
>
>
>> On
ration
> "taskmanager.memory.jvm-metaspace.size"? If it is
> too small, increasing it will help. Usually, 256m is enough for most cases.
>
>
> Best,
> Yang
>
> Vijayendra Yadav 于2020年8月25日周二 上午4:51写道:
>> Another one -
>>
>> Ex
Another one -
Exception in thread "FileCache shutdown hook"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "FileCache shutdown hook"
Regards,
Vijay
On Mon, Aug 24, 2020 at 1:04 PM Vijayendra Yadav
wrote:
> Actually got thi
aw (2/3)"
java.lang.OutOfMemoryError: Metaspace
Exception in thread "FileCache shutdown hook"
java.lang.OutOfMemoryError: Metaspace
Any suggestions on how to fix it ?
On Mon, Aug 24, 2020 at 12:53 PM Vijayendra Yadav
wrote:
> Hi Team,
>
> Running a flink job on Yarn, I am
Hi Team,
Running a flink job on Yarn, I am trying to make connections to
couchbase DB in one of my map functions in Flink Streaming job. But my task
manager containers keep failing
and keep assigning new containers and not giving me an opportunity to get
any useful logs.
val cluster = Cluster.co
Hi Team,
Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY)
on every checkpoint.
*.withRollingPolicy(OnCheckpointRollingPolicy.build())*
Question: What are recommended values related to checkpointing to fsstate,
should it be more frequent checkpoints, or longer intervals,
Hi Team,
I am trying to export Flink stream default metrics using Graphite, but I
can't find it in the Graphite metrics console. Could you confirm the steps
below are correct?
*1) Updated flink-conf.yaml*
metrics.reporter.grph.factory.class:
org.apache.flink.metrics.graphite.GraphiteReporterFa
i.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>
> Cheers,
> Till
>
> On Thu, Aug 20, 2020 at 7:06 PM Vijayendra Yadav
> wrote:
>
>> Hi Till/ Piotr,
>>
>> *My process was working with : FsStateBackend but when I switched
>> to RocksD
issue - you have two
>>> conflicting versions of
>>> `org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest`
>>> on the class path. Or you built your jar with one version and trying to
>>> execute it with a different one.
>>>
>>> Till is
Hi Team,
Getting the following error when using RocksDBStateBackend on yarn/EMR. Am
I missing any dependencies?
2020-08-20 04:37:00,713 ERROR
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl -
Exception on heartbeat
java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.api.protoco
Fri, Aug 14, 2020 at 2:04 PM Vijayendra Yadav
wrote:
> Hi Robert,
>
> Thanks for information. payloads so far are 400KB (each record).
> To achieve high parallelism at the downstream operator do I rebalance the
> kafka stream ? Could you give me an example please.
>
> Regards,
the files are where you would expect them.
>
>
> I hope this helps,
> Robert
>
> On Fri, Aug 14, 2020 at 6:57 AM Vijayendra Yadav
> wrote:
>
>> Hi Yangze,
>>
>> I tried the following: maybe I am missing something.
>> https://ci.apache.org/projects/flin
ator source, and write into S3.
>
> Do the math on your job: What's the theoretical limits of your job:
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>
> Hope this helps,
> Robert
>
>
> On Thu, Aug 13, 2020 at 11:25 PM Vi
onf by "--ship"
> command. Notice that this command only supports to ship folders now.
>
> Best,
> Yangze Guo
>
> On Fri, Aug 14, 2020 at 11:22 AM Vijayendra Yadav
> wrote:
> >
> > Any inputs ?
> >
> > On Tue, Aug 11, 2020 at 10:34 AM Vijayendra Ya
Any inputs ?
On Tue, Aug 11, 2020 at 10:34 AM Vijayendra Yadav
wrote:
> Dawid, I was able to resolve the keytab issue by passing the service name,
> but now I am facing the KRB5 issue.
>
> Caused by: org.apache.kafka.common.errors.SaslAuthenticationException:
> Failed to create
Hi Team,
I am trying to increase throughput of my flink stream job streaming from
kafka source and sink to s3. Currently it is running fine for small events
records. But records with large payloads are running extremely slow like at
rate 2 TPS.
Could you provide some best practices to tune?
Also,
/flink/flink-docs-stable/dev/connectors/kafka.html#enabling-kerberos-authentication
>
>
> On 09/08/2020 20:39, Vijayendra Yadav wrote:
>
> Hi Team,
>
> I am trying to stream data from kafkaconsumer using:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/k
Hi Team,
I am trying to stream data from kafkaconsumer using:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
Here my KAFKA is Kerberos secured and SSL enabled.
I am running my Flink streaming in yarn-cluster on EMR 5.31.
I have tried to pass keytab/principal in
gt; Yangze Guo
>
>> On Fri, Aug 7, 2020 at 4:02 AM Vijayendra Yadav
>> wrote:
>>
>> Hi Team,
>>
>> How can I override flink default conf/flink-conf.yaml from flink run command
>> with custom alternative path.
>> Also, when we override flink-con
Hi Team,
How can I override flink default conf/flink-conf.yaml from *flink run*
command with custom alternative path.
Also, when we override flink-conf.yaml, should it contain all variables
which are present in flink default conf/flink-conf.yaml or i can just
override selective variables from user
The javadoc for
> org.apache.flink.streaming.util.serialization.SimpleStringSchema says
> you should Use
> org.apache.flink.api.common.serialization.SimpleStringSchema instead.
>
> Regards,
> Roman
>
>
> On Mon, Aug 3, 2020 at 5:31 PM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>
Hi Team,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
new SimpleStringSchema() --> Is showing Deprecated in my IntelliJ.
Although it's working fine, Wanted to check if there is a replacement for
it ?
val properties = new Properties()
properties.setProperty
che/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82
>> . It's on table API but should be quite easy to translate to datastream API
>> if needed.
>&
f/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107
>
>
> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav,
> wrote:
>
>> Hi Team,
>>
>> Looking for some help and reference code / material to implement unit
>> tests o
Hi Team,
Looking for some help and reference code / material to implement unit tests
of possible scenarios in Flink *streaming *Code that should assert specific
cases.
Regards,
Vijay
zip2Codec
> xz - for xzCodec
>
>
> Regards,
> Ravi
>
> On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> If it is possible, please share the sample output file.
>> Regards,
>> Ravi
>>
>> O
avro
*Exception in thread "main" java.io.IOException: Not an Avro data file*
Am I missing something ?
Regards,
Vijay
On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav
wrote:
> Hi Ravi,
>
> Thanks for details. CustomAvrowriter was working for now. Although its
> failing for
ntation of AvroWriter
>> where you can add features of compression. Please find a sample
>> customization for AvroWriters where you could use compression. You can use
>> the example below.
>>
>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>
>> Custom
Hi Team,
Could you please provide a sample for Enabling Compression (Snappy) of
Avro:
DataStream[GenericRecord]
AvroWriters.forGenericRecord(schema)
Regards,
Vijay
Thank You.
On Wed, Jul 29, 2020 at 2:07 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:
> Yes, flink-compress module is supported from 1.10.0 and onward.
>
> Regards,
> Ravi
>
> On Tue 28 Jul, 2020, 23:11 Vijayendra Yadav,
> wrote:
>
>> Tha
doesn't support compression.
>
>
> Regards,
> Ravi
>
> On Tue 28 Jul, 2020, 22:08 Vijayendra Yadav,
> wrote:
>
>> Hi Ravi,
>>
>> Thanks for your response. But your example is for *forBulkForma**t*. How
>> about *forRowFormat* ?.
>>
>>
codecName)).build()
>
> Regards,
> Ravi
>
> On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>> Is there a way to enable compression in StreamingFileSink API for
>> Row-encoded formats ?.
>>
>> val sink: StreamingFil
Hi Team,
Is there a way to enable compression in StreamingFileSink API for
Row-encoded formats ?.
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new
SimpleStringEncoder[String]("UTF-8"))
Regards,
Vijay
Hi Flink Team,
*FLINK Streaming:* I have DataStream[String] from kafkaconsumer
DataStream stream = env
.addSource(new FlinkKafkaConsumer<>("topic", new
SimpleStringSchema(), properties));
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
I have to sink this st
84 matches
Mail list logo