Hi Contributors,

I have a very simple pipeline that just reads data from KafkaIO, then
prints the parsed data to the console. Below is the main function of my
program:

> public static void main(String[] args) {
>     // create pipeline
>     PipelineOptions pipelineOption = PipelineOptionsFactory.fromArgs(args)
>             .withoutStrictParsing()
>             .as(PipelineOptions.class);
>
>     Pipeline pipeline = Pipeline.create(pipelineOption);
>
>     // define input schema
>     Schema inputSchema = Schema.builder()
>             .addStringField("camera")
>             .addDateTimeField("event_time")
>             .addInt32Field("car")
>             .addInt32Field("person")
>             .build();
>
>     // generate stream source
>     PCollection<Row> rows = pipeline
>             .apply("read kafka", KafkaIO.<String, String>read()
>                     .withBootstrapServers("127.0.0.1:9092")
>                     .withTopic("beamKafkaTest")
>                     .withKeyDeserializer(StringDeserializer.class)
>                     .withValueDeserializer(StringDeserializer.class)
>                     .withReadCommitted()
>                     .commitOffsetsInFinalize()
>                     .withConsumerConfigUpdates(ImmutableMap.of("group.id", 
> "client-1"))
>                     .withoutMetadata()
>             )
>             // parse JSON
>             .apply("parse JSON", ParDo.of(new DoFn<KV<String, String>, Row>() 
> {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     String jsonData = c.element().getValue();
>
>                     // parse json
>                     JSONObject jsonObject = JSON.parseObject(jsonData);
>
>                     // build row
>                     List<Object> list = new ArrayList<>();
>                     list.add(jsonObject.get("camera"));
>                     list.add(dtf.parseDateTime((String) 
> jsonObject.get("event_time")));
>                     list.add(jsonObject.get("car"));
>                     list.add(jsonObject.get("person"));
>                     Row row = Row.withSchema(inputSchema)
>                             .addValues(list)
>                             .build();
>
>                     System.out.println(row);
>
>                     // emit row
>                     c.output(row);
>                 }
>             }))
>             // set input schema
>             .setRowSchema(inputSchema);
>
>     // define output schema
>     Schema outputSchema = Schema.builder()
>             .addStringField("camera")
>             .addDateTimeField("event_time")
>             .addInt32Field("car")
>             .addInt32Field("person")
>             .build();
>
>     // print results
>     rows
>             .apply(
>                     "log_result",
>                     MapElements.via(
>                             new SimpleFunction<Row, Row>() {
>                                 @Override
>                                 public Row apply(Row input) {
>                                     // expect output:
>                                     // RESULT: [row, 5.0]
>                                     System.out.println("RESULT: " + 
> input.getValues());
>                                     return input;
>                                 }
>                             }))
>             .setRowSchema(
>                     outputSchema
>             );
>
>     // run
>     pipeline.run();
> }
>
> This code works well with the direct runner and the Flink runner. But when
it comes to the Spark runner, it shut down immediately after submitted to
the spark cluster without any error messages. As expected it should keep
running and consuming messages until we stop it. But now it just performs
like a one-pass program. And I tested some of the official examples
(WordCount, WindowedWordCount) that goes with the bounded sources(TextIO),
they all worked well with the Spark Runner.

The project structure I used is generated from the maven archetype provided
on this page
<https://beam.apache.org/get-started/quickstart-java/#get-the-wordcount-code>.
The logs of my submitted spark application are attached at the bottom of
this email. Since it printed the consumer configurations, I guess it had
already initialized the Kafka consumer, but failed in the following steps.

So my question is: *Why does my KafkaIO program shut down immediately when
running with the Spark runner, but work well with the direct runner and
flink runner? Is there any existing example of using KafkaIO with the Spark
runner?*

Really appreciate your help and advice! Stay safe and happy!

Many thanks,
Minreng Wu


Top of Log
>
> Spark Executor Command: 
> "/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java" 
> "-cp" 
> "/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/conf/:/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/jars/*"
>  "-Xmx1024M" "-Dspark.driver.port=53118" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
> "spark://[email protected]:53118" "--executor-id" "0" 
> "--hostname" "10.0.10.123" "--cores" "12" "--app-id" 
> "app-20200921011535-0003" "--worker-url" "spark://[email protected]:53084"
> ========================================
>
> 20/09/21 01:15:32 WARN Utils: Your hostname, Minrengs-MacBook-Pro.local 
> resolves to a loopback address: 127.0.0.1; using 10.0.10.123 instead (on 
> interface en0)
> 20/09/21 01:15:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
> another address
> 20/09/21 01:15:33 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> log4j:WARN No appenders could be found for logger 
> (org.apache.beam.sdk.options.PipelineOptionsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 20/09/21 01:15:35 WARN Checkpoint: Checkpoint directory 
> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint does not exist
> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Creating a new 
> Spark Streaming Context
> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Setting Spark 
> streaming batchDuration to 500 msec
> 20/09/21 01:15:35 INFO SparkContextFactory: Creating a brand new Spark 
> Context.
> 20/09/21 01:15:35 INFO SparkContext: Running Spark version 2.4.5
> 20/09/21 01:15:35 INFO SparkContext: Submitted application: BeamSqlApp2
> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls to: wumrwds
> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls to: wumrwds
> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls groups to:
> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls groups to:
> 20/09/21 01:15:35 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users  with view permissions: Set(wumrwds); 
> groups with view permissions: Set(); users  with modify permissions: 
> Set(wumrwds); groups with modify permissions: Set()
> 20/09/21 01:15:35 INFO Utils: Successfully started service 'sparkDriver' on 
> port 53118.
> 20/09/21 01:15:35 INFO SparkEnv: Registering MapOutputTracker
> 20/09/21 01:15:35 INFO SparkEnv: Registering BlockManagerMaster
> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Using 
> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
> information
> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint 
> up
> 20/09/21 01:15:35 INFO DiskBlockManager: Created local directory at 
> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/blockmgr-e26bb6e9-1c3a-45ee-b176-abb7ca03ae4f
> 20/09/21 01:15:35 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
> 20/09/21 01:15:35 INFO SparkEnv: Registering OutputCommitCoordinator
> 20/09/21 01:15:35 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 20/09/21 01:15:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http://10.0.10.123:4040
> 20/09/21 01:15:35 INFO SparkContext: Added JAR 
> file:/Users/wumrwds/Git/play/beam-play/word-count-beam/target/word-count-beam-bundled-0.1.jar
>  at spark://10.0.10.123:53118/jars/word-count-beam-bundled-0.1.jar with 
> timestamp 1600668935676
> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Connecting to 
> master spark://Minrengs-MacBook-Pro.local:7077...
> 20/09/21 01:15:35 INFO TransportClientFactory: Successfully created 
> connection to Minrengs-MacBook-Pro.local/127.0.0.1:7077 after 26 ms (0 ms 
> spent in bootstraps)
> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Connected to Spark cluster 
> with app ID app-20200921011535-0003
> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor added: 
> app-20200921011535-0003/0 on worker-20200921011236-10.0.10.123-53084 
> (10.0.10.123:53084) with 12 core(s)
> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Granted executor ID 
> app-20200921011535-0003/0 on hostPort 10.0.10.123:53084 with 12 core(s), 
> 1024.0 MB RAM
> 20/09/21 01:15:35 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53120.
> 20/09/21 01:15:35 INFO NettyBlockTransferService: Server created on 
> 10.0.10.123:53120
> 20/09/21 01:15:35 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy
> 20/09/21 01:15:35 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor updated: 
> app-20200921011535-0003/0 is now RUNNING
> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Registering block manager 
> 10.0.10.123:53120 with 366.3 MB RAM, BlockManagerId(driver, 10.0.10.123, 
> 53120, None)
> 20/09/21 01:15:35 INFO BlockManagerMaster: Registered BlockManager 
> BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: SchedulerBackend is ready 
> for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
> 20/09/21 01:15:35 WARN Checkpoint$CheckpointDir: The specified checkpoint dir 
> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable 
> filesystem so in case of failures this job may not recover properly or even 
> at all.
> 20/09/21 01:15:35 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: 
> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
> 20/09/21 01:15:36 INFO MetricsAccumulator: No metrics checkpoint found.
> 20/09/21 01:15:36 INFO MetricsAccumulator: Instantiated metrics accumulator: 
> MetricQueryResults()
> 20/09/21 01:15:36 WARN Checkpoint$CheckpointDir: The specified checkpoint dir 
> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable 
> filesystem so in case of failures this job may not recover properly or even 
> at all.
> 20/09/21 01:15:36 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: 
> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
> 20/09/21 01:15:36 INFO AggregatorsAccumulator: No accumulator checkpoint 
> found.
> 20/09/21 01:15:36 INFO AggregatorsAccumulator: Instantiated aggregators 
> accumulator:
> 20/09/21 01:15:36 INFO SparkRunner$Evaluator: Evaluating 
> Read(KafkaUnboundedSource)
> 20/09/21 01:15:36 INFO PIDRateEstimator: Created PIDRateEstimator with 
> proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 100.0
> 20/09/21 01:15:36 INFO SourceDStream: Read duration set to: PT0.200S
> 20/09/21 01:15:36 INFO SourceDStream: Max records per batch has not been 
> limited by neither configuration nor the rate controller, and will remain 
> unlimited for the current batch (9223372036854775807).
> 20/09/21 01:15:36 INFO ConsumerConfig: ConsumerConfig values:
>       allow.auto.create.topics = true
>       auto.commit.interval.ms = 5000
>       auto.offset.reset = latest
>       bootstrap.servers = [10.0.10.123:9092]
>       check.crcs = true
>       client.dns.lookup = default
>       client.id =
>       client.rack =
>       connections.max.idle.ms = 540000
>       default.api.timeout.ms = 60000
>       enable.auto.commit = false
>       exclude.internal.topics = true
>       fetch.max.bytes = 52428800
>       fetch.max.wait.ms = 500
>       fetch.min.bytes = 1
>       group.id = client-1
>       group.instance.id = null
>       heartbeat.interval.ms = 3000
>       interceptor.classes = []
>       internal.leave.group.on.close = true
>       isolation.level = read_committed
>       key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>       max.partition.fetch.bytes = 1048576
>       max.poll.interval.ms = 300000
>       max.poll.records = 500
>       metadata.max.age.ms = 300000
>       metric.reporters = []
>       metrics.num.samples = 2
>       metrics.recording.level = INFO
>       metrics.sample.window.ms = 30000
>       partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>       receive.buffer.bytes = 524288
>       reconnect.backoff.max.ms = 1000
>       reconnect.backoff.ms = 50
>       request.timeout.ms = 30000
>       retry.backoff.ms = 100
>       sasl.client.callback.handler.class = null
>       sasl.jaas.config = null
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       sasl.kerberos.min.time.before.relogin = 60000
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       sasl.login.callback.handler.class = null
>       sasl.login.class = null
>       sasl.login.refresh.buffer.seconds = 300
>       sasl.login.refresh.min.period.seconds = 60
>       sasl.login.refresh.window.factor = 0.8
>       sasl.login.refresh.window.jitter = 0.05
>       sasl.mechanism = GSSAPI
>       security.protocol = PLAINTEXT
>       security.providers = null
>       send.buffer.bytes = 131072
>       session.timeout.ms = 10000
>       ssl.cipher.suites = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       ssl.endpoint.identification.algorithm = https
>       ssl.key.password = null
>       ssl.keymanager.algorithm = SunX509
>       ssl.keystore.location = null
>       ssl.keystore.password = null
>       ssl.keystore.type = JKS
>       ssl.protocol = TLS
>       ssl.provider = null
>       ssl.secure.random.implementation = null
>       ssl.trustmanager.algorithm = PKIX
>       ssl.truststore.location = null
>       ssl.truststore.password = null
>       ssl.truststore.type = JKS
>       value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> 20/09/21 01:15:36 INFO AppInfoParser: Kafka version: 2.4.1
> 20/09/21 01:15:36 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b
> 20/09/21 01:15:36 INFO AppInfoParser: Kafka startTimeMs: 1600668936638
> 20/09/21 01:15:37 INFO Metadata: [Consumer clientId=consumer-client-1-1, 
> groupId=client-1] Cluster ID: jYuebrB3RQKaA55bNA1Vsw
> 20/09/21 01:15:37 INFO KafkaUnboundedSource: Partitions assigned to split 0 
> (total 1): beamKafkaTest-0
> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating 
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1@63718b93
> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating 
> org.apache.beam.examples.BeamSqlApp2$1@4930539b
> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating 
> org.apache.beam.sdk.transforms.MapElements$1@37b72ea
> 20/09/21 01:15:37 WARN SparkContext: Spark is not running in local mode, 
> therefore the checkpoint directory must not be on the local filesystem. 
> Directory 
> 'file:/tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint' appears 
> to be on the local filesystem.
> 20/09/21 01:15:37 INFO SparkRunner: Starting streaming pipeline execution.
> 20/09/21 01:15:37 INFO SourceDStream: Duration for remembering RDDs set to 
> 10000 ms for org.apache.beam.runners.spark.io.SourceDStream@49db1a95
> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated 
> org.apache.beam.runners.spark.io.SourceDStream@49db1a95
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory 
> Deserialized 1x Replicated
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 
> 5000 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 10000 
> ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated 
> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@6275db71
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Slide time = 
> 500 ms
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Storage level 
> = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Checkpoint 
> interval = null
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Remember 
> interval = 500 ms
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Initialized 
> and validated 
> org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream@3ef0d3c5
> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated 
> org.apache.beam.runners.spark.io.SourceDStream@49db1a95
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory 
> Deserialized 1x Replicated
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 
> 5000 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 10000 
> ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated 
> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
> 20/09/21 01:15:37 INFO FlatMappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FlatMappedDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO FlatMappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FlatMappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FlatMappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.FlatMappedDStream@4c293f9c
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@3430988a
> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.TransformedDStream@17ab8c32
> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.FilteredDStream@14660807
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@8f826ef
> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.TransformedDStream@7e734051
> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.FilteredDStream@1be77a7
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@25bae116
> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.TransformedDStream@6e41c8a5
> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.FilteredDStream@362e3875
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@22aaf27c
> 20/09/21 01:15:37 INFO ForEachDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO ForEachDStream: Storage level = Serialized 1x 
> Replicated
> 20/09/21 01:15:37 INFO ForEachDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO ForEachDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@3bd7e62b
> 20/09/21 01:15:37 INFO RecurringTimer: Started timer for JobGenerator at time 
> 1600668938000
> 20/09/21 01:15:37 INFO JobGenerator: Started JobGenerator at 1600668938000 ms
> 20/09/21 01:15:37 INFO JobScheduler: Started JobScheduler
> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext started
> 20/09/21 01:15:37 INFO StreamingContext: Invoking stop(stopGracefully=false) 
> from shutdown hook
> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog shutting 
> down at time: 1600668937717.
> 20/09/21 01:15:37 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer 
> queue interrupted.
> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog Writer 
> thread exiting.
> 20/09/21 01:15:37 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Stopped 
> write ahead log manager
> 20/09/21 01:15:37 INFO ReceiverTracker: ReceiverTracker stopped
> 20/09/21 01:15:37 INFO JobGenerator: Stopping JobGenerator immediately
> 20/09/21 01:15:37 INFO RecurringTimer: Stopped timer for JobGenerator after 
> time -1
> 20/09/21 01:15:37 INFO CheckpointWriter: CheckpointWriter executor 
> terminated? true, waited for 0 ms.
> 20/09/21 01:15:37 INFO JobGenerator: Stopped JobGenerator
> 20/09/21 01:15:37 INFO JobScheduler: Stopped JobScheduler
> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext stopped successfully
> 20/09/21 01:15:37 INFO SparkContext: Invoking stop() from shutdown hook
> 20/09/21 01:15:37 INFO SparkUI: Stopped Spark web UI at 
> http://10.0.10.123:4040
> 20/09/21 01:15:37 INFO StandaloneSchedulerBackend: Shutting down all executors
> 20/09/21 01:15:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking 
> each executor to shut down
> 20/09/21 01:15:37 INFO MapOutputTrackerMasterEndpoint: 
> MapOutputTrackerMasterEndpoint stopped!
> 20/09/21 01:15:37 INFO MemoryStore: MemoryStore cleared
> 20/09/21 01:15:37 INFO BlockManager: BlockManager stopped
> 20/09/21 01:15:37 INFO BlockManagerMaster: BlockManagerMaster stopped
> 20/09/21 01:15:37 INFO 
> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
> OutputCommitCoordinator stopped!
> 20/09/21 01:15:37 INFO SparkContext: Successfully stopped SparkContext
> 20/09/21 01:15:37 INFO ShutdownHookManager: Shutdown hook called
> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory 
> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-fac2b91e-8bfc-4855-b0f5-bbb206bb678b
> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory 
> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-e3447ff1-ca22-4574-9864-834fac63eea3
>
>

Reply via email to