Hi all, I just found the reason why my application shut down immediately. It's because I didn't call the `.waitUntilFinish()` method when I run the pipeline.
Sorry for bothering you all due to such a stupid mistake! Thanks a lot! Have a nice week! Many thanks, Minreng On Mon, Sep 21, 2020 at 2:07 AM Minreng Wu <[email protected]> wrote: > Hi Contributors, > > I have a very simple pipeline that just reads data from KafkaIO, then > prints the parsed data to the console. Below is the main function of my > program: > >> public static void main(String[] args) { >> // create pipeline >> PipelineOptions pipelineOption = PipelineOptionsFactory.fromArgs(args) >> .withoutStrictParsing() >> .as(PipelineOptions.class); >> >> Pipeline pipeline = Pipeline.create(pipelineOption); >> >> // define input schema >> Schema inputSchema = Schema.builder() >> .addStringField("camera") >> .addDateTimeField("event_time") >> .addInt32Field("car") >> .addInt32Field("person") >> .build(); >> >> // generate stream source >> PCollection<Row> rows = pipeline >> .apply("read kafka", KafkaIO.<String, String>read() >> .withBootstrapServers("127.0.0.1:9092") >> .withTopic("beamKafkaTest") >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class) >> .withReadCommitted() >> .commitOffsetsInFinalize() >> .withConsumerConfigUpdates(ImmutableMap.of("group.id", >> "client-1")) >> .withoutMetadata() >> ) >> // parse JSON >> .apply("parse JSON", ParDo.of(new DoFn<KV<String, String>, >> Row>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> String jsonData = c.element().getValue(); >> >> // parse json >> JSONObject jsonObject = JSON.parseObject(jsonData); >> >> // build row >> List<Object> list = new ArrayList<>(); >> list.add(jsonObject.get("camera")); >> list.add(dtf.parseDateTime((String) >> jsonObject.get("event_time"))); >> list.add(jsonObject.get("car")); >> list.add(jsonObject.get("person")); >> Row row = Row.withSchema(inputSchema) >> .addValues(list) >> .build(); >> >> System.out.println(row); >> >> // emit row >> c.output(row); >> } >> })) >> // set input schema >> .setRowSchema(inputSchema); >> >> // define output schema >> Schema outputSchema = Schema.builder() >> .addStringField("camera") >> .addDateTimeField("event_time") >> .addInt32Field("car") >> .addInt32Field("person") >> .build(); >> >> // print results >> rows >> .apply( >> "log_result", >> MapElements.via( >> new SimpleFunction<Row, Row>() { >> @Override >> public Row apply(Row input) { >> // expect output: >> // RESULT: [row, 5.0] >> System.out.println("RESULT: " + >> input.getValues()); >> return input; >> } >> })) >> .setRowSchema( >> outputSchema >> ); >> >> // run >> pipeline.run(); >> } >> >> This code works well with the direct runner and the Flink runner. But > when it comes to the Spark runner, it shut down immediately after submitted > to the spark cluster without any error messages. As expected it should keep > running and consuming messages until we stop it. But now it just performs > like a one-pass program. And I tested some of the official examples > (WordCount, WindowedWordCount) that goes with the bounded sources(TextIO), > they all worked well with the Spark Runner. > > The project structure I used is generated from the maven archetype > provided on this page > <https://beam.apache.org/get-started/quickstart-java/#get-the-wordcount-code>. > The logs of my submitted spark application are attached at the bottom of > this email. Since it printed the consumer configurations, I guess it had > already initialized the Kafka consumer, but failed in the following steps. > > So my question is: *Why does my KafkaIO program shut down immediately > when running with the Spark runner, but work well with the direct runner > and flink runner? Is there any existing example of using KafkaIO with the > Spark runner?* > > Really appreciate your help and advice! Stay safe and happy! > > Many thanks, > Minreng Wu > > > Top of Log >> >> Spark Executor Command: >> "/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java" >> "-cp" >> "/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/conf/:/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/jars/*" >> "-Xmx1024M" "-Dspark.driver.port=53118" >> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" >> "spark://[email protected]:53118" "--executor-id" "0" >> "--hostname" "10.0.10.123" "--cores" "12" "--app-id" >> "app-20200921011535-0003" "--worker-url" "spark://[email protected]:53084" >> ======================================== >> >> 20/09/21 01:15:32 WARN Utils: Your hostname, Minrengs-MacBook-Pro.local >> resolves to a loopback address: 127.0.0.1; using 10.0.10.123 instead (on >> interface en0) >> 20/09/21 01:15:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to >> another address >> 20/09/21 01:15:33 WARN NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> log4j:WARN No appenders could be found for logger >> (org.apache.beam.sdk.options.PipelineOptionsFactory). >> log4j:WARN Please initialize the log4j system properly. >> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for >> more info. >> Using Spark's default log4j profile: >> org/apache/spark/log4j-defaults.properties >> 20/09/21 01:15:35 WARN Checkpoint: Checkpoint directory >> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint does not exist >> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Creating a new >> Spark Streaming Context >> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Setting Spark >> streaming batchDuration to 500 msec >> 20/09/21 01:15:35 INFO SparkContextFactory: Creating a brand new Spark >> Context. >> 20/09/21 01:15:35 INFO SparkContext: Running Spark version 2.4.5 >> 20/09/21 01:15:35 INFO SparkContext: Submitted application: BeamSqlApp2 >> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls to: wumrwds >> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls to: wumrwds >> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls groups to: >> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls groups to: >> 20/09/21 01:15:35 INFO SecurityManager: SecurityManager: authentication >> disabled; ui acls disabled; users with view permissions: Set(wumrwds); >> groups with view permissions: Set(); users with modify permissions: >> Set(wumrwds); groups with modify permissions: Set() >> 20/09/21 01:15:35 INFO Utils: Successfully started service 'sparkDriver' on >> port 53118. >> 20/09/21 01:15:35 INFO SparkEnv: Registering MapOutputTracker >> 20/09/21 01:15:35 INFO SparkEnv: Registering BlockManagerMaster >> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Using >> org.apache.spark.storage.DefaultTopologyMapper for getting topology >> information >> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: >> BlockManagerMasterEndpoint up >> 20/09/21 01:15:35 INFO DiskBlockManager: Created local directory at >> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/blockmgr-e26bb6e9-1c3a-45ee-b176-abb7ca03ae4f >> 20/09/21 01:15:35 INFO MemoryStore: MemoryStore started with capacity 366.3 >> MB >> 20/09/21 01:15:35 INFO SparkEnv: Registering OutputCommitCoordinator >> 20/09/21 01:15:35 INFO Utils: Successfully started service 'SparkUI' on port >> 4040. >> 20/09/21 01:15:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at >> http://10.0.10.123:4040 >> 20/09/21 01:15:35 INFO SparkContext: Added JAR >> file:/Users/wumrwds/Git/play/beam-play/word-count-beam/target/word-count-beam-bundled-0.1.jar >> at spark://10.0.10.123:53118/jars/word-count-beam-bundled-0.1.jar with >> timestamp 1600668935676 >> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Connecting to >> master spark://Minrengs-MacBook-Pro.local:7077... >> 20/09/21 01:15:35 INFO TransportClientFactory: Successfully created >> connection to Minrengs-MacBook-Pro.local/127.0.0.1:7077 after 26 ms (0 ms >> spent in bootstraps) >> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Connected to Spark >> cluster with app ID app-20200921011535-0003 >> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor added: >> app-20200921011535-0003/0 on worker-20200921011236-10.0.10.123-53084 >> (10.0.10.123:53084) with 12 core(s) >> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Granted executor ID >> app-20200921011535-0003/0 on hostPort 10.0.10.123:53084 with 12 core(s), >> 1024.0 MB RAM >> 20/09/21 01:15:35 INFO Utils: Successfully started service >> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53120. >> 20/09/21 01:15:35 INFO NettyBlockTransferService: Server created on >> 10.0.10.123:53120 >> 20/09/21 01:15:35 INFO BlockManager: Using >> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication >> policy >> 20/09/21 01:15:35 INFO BlockManagerMaster: Registering BlockManager >> BlockManagerId(driver, 10.0.10.123, 53120, None) >> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor updated: >> app-20200921011535-0003/0 is now RUNNING >> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Registering block manager >> 10.0.10.123:53120 with 366.3 MB RAM, BlockManagerId(driver, 10.0.10.123, >> 53120, None) >> 20/09/21 01:15:35 INFO BlockManagerMaster: Registered BlockManager >> BlockManagerId(driver, 10.0.10.123, 53120, None) >> 20/09/21 01:15:35 INFO BlockManager: Initialized BlockManager: >> BlockManagerId(driver, 10.0.10.123, 53120, None) >> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: SchedulerBackend is ready >> for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 >> 20/09/21 01:15:35 WARN Checkpoint$CheckpointDir: The specified checkpoint >> dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable >> filesystem so in case of failures this job may not recover properly or even >> at all. >> 20/09/21 01:15:35 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: >> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd >> 20/09/21 01:15:36 INFO MetricsAccumulator: No metrics checkpoint found. >> 20/09/21 01:15:36 INFO MetricsAccumulator: Instantiated metrics accumulator: >> MetricQueryResults() >> 20/09/21 01:15:36 WARN Checkpoint$CheckpointDir: The specified checkpoint >> dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable >> filesystem so in case of failures this job may not recover properly or even >> at all. >> 20/09/21 01:15:36 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: >> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd >> 20/09/21 01:15:36 INFO AggregatorsAccumulator: No accumulator checkpoint >> found. >> 20/09/21 01:15:36 INFO AggregatorsAccumulator: Instantiated aggregators >> accumulator: >> 20/09/21 01:15:36 INFO SparkRunner$Evaluator: Evaluating >> Read(KafkaUnboundedSource) >> 20/09/21 01:15:36 INFO PIDRateEstimator: Created PIDRateEstimator with >> proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 100.0 >> 20/09/21 01:15:36 INFO SourceDStream: Read duration set to: PT0.200S >> 20/09/21 01:15:36 INFO SourceDStream: Max records per batch has not been >> limited by neither configuration nor the rate controller, and will remain >> unlimited for the current batch (9223372036854775807). >> 20/09/21 01:15:36 INFO ConsumerConfig: ConsumerConfig values: >> allow.auto.create.topics = true >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [10.0.10.123:9092] >> check.crcs = true >> client.dns.lookup = default >> client.id = >> client.rack = >> connections.max.idle.ms = 540000 >> default.api.timeout.ms = 60000 >> enable.auto.commit = false >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> group.id = client-1 >> group.instance.id = null >> heartbeat.interval.ms = 3000 >> interceptor.classes = [] >> internal.leave.group.on.close = true >> isolation.level = read_committed >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> max.partition.fetch.bytes = 1048576 >> max.poll.interval.ms = 300000 >> max.poll.records = 500 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 30000 >> partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> receive.buffer.bytes = 524288 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 30000 >> retry.backoff.ms = 100 >> sasl.client.callback.handler.class = null >> sasl.jaas.config = null >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 60000 >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.login.callback.handler.class = null >> sasl.login.class = null >> sasl.login.refresh.buffer.seconds = 300 >> sasl.login.refresh.min.period.seconds = 60 >> sasl.login.refresh.window.factor = 0.8 >> sasl.login.refresh.window.jitter = 0.05 >> sasl.mechanism = GSSAPI >> security.protocol = PLAINTEXT >> security.providers = null >> send.buffer.bytes = 131072 >> session.timeout.ms = 10000 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = https >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> >> 20/09/21 01:15:36 INFO AppInfoParser: Kafka version: 2.4.1 >> 20/09/21 01:15:36 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b >> 20/09/21 01:15:36 INFO AppInfoParser: Kafka startTimeMs: 1600668936638 >> 20/09/21 01:15:37 INFO Metadata: [Consumer clientId=consumer-client-1-1, >> groupId=client-1] Cluster ID: jYuebrB3RQKaA55bNA1Vsw >> 20/09/21 01:15:37 INFO KafkaUnboundedSource: Partitions assigned to split 0 >> (total 1): beamKafkaTest-0 >> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating >> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1@63718b93 >> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating >> org.apache.beam.examples.BeamSqlApp2$1@4930539b >> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating >> org.apache.beam.sdk.transforms.MapElements$1@37b72ea >> 20/09/21 01:15:37 WARN SparkContext: Spark is not running in local mode, >> therefore the checkpoint directory must not be on the local filesystem. >> Directory >> 'file:/tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint' appears >> to be on the local filesystem. >> 20/09/21 01:15:37 INFO SparkRunner: Starting streaming pipeline execution. >> 20/09/21 01:15:37 INFO SourceDStream: Duration for remembering RDDs set to >> 10000 ms for org.apache.beam.runners.spark.io.SourceDStream@49db1a95 >> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms >> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated >> org.apache.beam.runners.spark.io.SourceDStream@49db1a95 >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory >> Deserialized 1x Replicated >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = >> 5000 ms >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = >> 10000 ms >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and >> validated >> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized >> 1x Replicated >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated >> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df >> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.MappedDStream@6275db71 >> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Slide time = >> 500 ms >> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Storage level >> = Serialized 1x Replicated >> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Checkpoint >> interval = null >> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Remember >> interval = 500 ms >> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Initialized >> and validated >> org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream@3ef0d3c5 >> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms >> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated >> org.apache.beam.runners.spark.io.SourceDStream@49db1a95 >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory >> Deserialized 1x Replicated >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = >> 5000 ms >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = >> 10000 ms >> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and >> validated >> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized >> 1x Replicated >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated >> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df >> 20/09/21 01:15:37 INFO FlatMappedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO FlatMappedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO FlatMappedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO FlatMappedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO FlatMappedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.FlatMappedDStream@4c293f9c >> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.MappedDStream@3430988a >> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.TransformedDStream@17ab8c32 >> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated >> org.apache.spark.streaming.dstream.FilteredDStream@14660807 >> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.MappedDStream@8f826ef >> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.TransformedDStream@7e734051 >> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated >> org.apache.spark.streaming.dstream.FilteredDStream@1be77a7 >> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.MappedDStream@25bae116 >> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.TransformedDStream@6e41c8a5 >> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated >> org.apache.spark.streaming.dstream.FilteredDStream@362e3875 >> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated >> org.apache.spark.streaming.dstream.MappedDStream@22aaf27c >> 20/09/21 01:15:37 INFO ForEachDStream: Slide time = 500 ms >> 20/09/21 01:15:37 INFO ForEachDStream: Storage level = Serialized 1x >> Replicated >> 20/09/21 01:15:37 INFO ForEachDStream: Checkpoint interval = null >> 20/09/21 01:15:37 INFO ForEachDStream: Remember interval = 500 ms >> 20/09/21 01:15:37 INFO ForEachDStream: Initialized and validated >> org.apache.spark.streaming.dstream.ForEachDStream@3bd7e62b >> 20/09/21 01:15:37 INFO RecurringTimer: Started timer for JobGenerator at >> time 1600668938000 >> 20/09/21 01:15:37 INFO JobGenerator: Started JobGenerator at 1600668938000 ms >> 20/09/21 01:15:37 INFO JobScheduler: Started JobScheduler >> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext started >> 20/09/21 01:15:37 INFO StreamingContext: Invoking stop(stopGracefully=false) >> from shutdown hook >> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog shutting >> down at time: 1600668937717. >> 20/09/21 01:15:37 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer >> queue interrupted. >> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog Writer >> thread exiting. >> 20/09/21 01:15:37 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Stopped >> write ahead log manager >> 20/09/21 01:15:37 INFO ReceiverTracker: ReceiverTracker stopped >> 20/09/21 01:15:37 INFO JobGenerator: Stopping JobGenerator immediately >> 20/09/21 01:15:37 INFO RecurringTimer: Stopped timer for JobGenerator after >> time -1 >> 20/09/21 01:15:37 INFO CheckpointWriter: CheckpointWriter executor >> terminated? true, waited for 0 ms. >> 20/09/21 01:15:37 INFO JobGenerator: Stopped JobGenerator >> 20/09/21 01:15:37 INFO JobScheduler: Stopped JobScheduler >> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext stopped >> successfully >> 20/09/21 01:15:37 INFO SparkContext: Invoking stop() from shutdown hook >> 20/09/21 01:15:37 INFO SparkUI: Stopped Spark web UI at >> http://10.0.10.123:4040 >> 20/09/21 01:15:37 INFO StandaloneSchedulerBackend: Shutting down all >> executors >> 20/09/21 01:15:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking >> each executor to shut down >> 20/09/21 01:15:37 INFO MapOutputTrackerMasterEndpoint: >> MapOutputTrackerMasterEndpoint stopped! >> 20/09/21 01:15:37 INFO MemoryStore: MemoryStore cleared >> 20/09/21 01:15:37 INFO BlockManager: BlockManager stopped >> 20/09/21 01:15:37 INFO BlockManagerMaster: BlockManagerMaster stopped >> 20/09/21 01:15:37 INFO >> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: >> OutputCommitCoordinator stopped! >> 20/09/21 01:15:37 INFO SparkContext: Successfully stopped SparkContext >> 20/09/21 01:15:37 INFO ShutdownHookManager: Shutdown hook called >> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory >> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-fac2b91e-8bfc-4855-b0f5-bbb206bb678b >> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory >> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-e3447ff1-ca22-4574-9864-834fac63eea3 >> >>
