Hello,
I am experimenting with the Python DataStream API in Flink 1.13, in order
to confirm that it is a viable fit for our needs, basically trying to prove
that what can be done in the Java DataStream API also works in Python.
During testing of a processing pipeline, I encountered a problem at the
initialization of the job on my cluster.
Currently, I am running Flink on a local Docker cluster consisting of a
JobManager and a TaskManager (created from the same image) with the
following jars installed on the containers:

> flink-csv-1.13.0.jar        flink-json-1.13.0.jar
> flink-sql-connector-kafka_2.12-1.13.0.jar  flink-table_2.12-1.13.0.jar
> log4j-api-2.12.1.jar   log4j-slf4j-impl-2.12.1.jar
> flink-dist_2.12-1.13.0.jar  flink-shaded-zookeeper-3.4.14.jar
> flink-table-blink_2.12-1.13.0.jar          log4j-1.2-api-2.12.1.jar
>  log4j-core-2.12.1.jar


Whenever I try to submit the job with jobmanager, the same exception stack
is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler
.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler
.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler
.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler
.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase
.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.SchedulerNG
.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster
.updateTaskExecutionState(JobMaster.java:435)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)
Caused by: java.lang.RuntimeException: Failed to start remote bundle
    at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.startBundle(BeamPythonFunctionRunner.java:375)
    at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.checkInvokeStartBundle(BeamPythonFunctionRunner
.java:436)
    at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.process(BeamPythonFunctionRunner.java:311)
    at org.apache.flink.streaming.api.operators.python.
OneInputPythonFunctionOperator.processElement(OneInputPythonFunctionOperator
.java:167)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.
collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.
collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.
StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(
StreamSourceContexts.java:322)
    at org.apache.flink.streaming.api.operators.
StreamSourceContexts$WatermarkContext.collectWithTimestamp(
StreamSourceContexts.java:426)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordsWithTimestamps(AbstractFetcher.java:365)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher
.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher
.runFetchLoop(KafkaFetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:826)
    at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.RuntimeException: No client connected within timeout
    at org.apache.beam.runners.fnexecution.data.GrpcDataService.send(
GrpcDataService.java:192)
    at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor.newBundle(SdkHarnessClient.java:287)
    at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor.newBundle(SdkHarnessClient.java:197)
    at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.getBundle(
DefaultJobBundleFactory.java:519)
    at org.apache.beam.runners.fnexecution.control.StageBundleFactory
.getBundle(StageBundleFactory.java:87)
    at org.apache.beam.runners.fnexecution.control.StageBundleFactory
.getBundle(StageBundleFactory.java:76)
    at org.apache.beam.runners.fnexecution.control.StageBundleFactory
.getBundle(StageBundleFactory.java:40)
    at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.startBundle(BeamPythonFunctionRunner.java:368)
    ... 17 more
Caused by: java.util.concurrent.TimeoutException: Waited 3 minutes for
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
SettableFuture@32c3875d[status=PENDING]
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
AbstractFuture.get(AbstractFuture.java:471)
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
AbstractFuture$TrustedFuture.get(AbstractFuture.java:90)
    at org.apache.beam.runners.fnexecution.data.GrpcDataService.send(
GrpcDataService.java:187)
    ... 24 more

I try to execute the *datastream_consumer.py* job (sent as attachment) by
running `flink run -py datastream_consumer.py` on the jobmanager container.
The pipeline collects data from Kafka and generates new events based on the
ones it gathers, which are then also placed back into a different Kafka
topic.
The input topic contains events in json format, I also provide a sample
event in the* test_event.json* attachment.

Am I doing something wrong, or do I need some other libraries to be present
on the job/task-manager images? I need some help in identifying what the
actual cause of the problem is.
from sys import path
from typing import TYPE_CHECKING, Type
from py4j.protocol import FATAL_ERROR
from pyflink.common.serialization import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.datastream.functions import MapFunction

KAFKA_BOOSTRAP = 'kafka.cluster:29030'


def has_too_many_permissions(event):
    if event['payload']['Policy']['Policy']['AttachmentCount'] == 0 or (not event['payload']['Policy']['Policy']['Policy']['IsAttachable']):
        return False

    for statement in event['payload']['Version']['PolicyVersion']['Document']['Statement']:
        if statement['Effect'] == 'Allow' and statement['Resource'] == "*":
            return True

        return False


def get_output(event):
    return {
        'type': 'TooManyRightsRole',
        'policyName': event['payload']['Policy']['Policy']['PolicyId'],
        'noRolesAssignedTo': event['payload']['Policy']['Policy']['AttachmentCount']
    }


def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()

    input_type = Types.ROW([
        Types.ROW([  # payload
            Types.ROW([  # Version
                Types.ROW([  # PolicyVersion
                    Types.ROW([  # Document
                        Types.LIST(  # Statement
                            Types.ROW([
                                Types.STRING(),  # Action
                                Types.STRING(),  # Effect
                                Types.STRING()  # Resource
                            ])
                        )
                    ])
                ])
            ]),
            Types.ROW([  # Policy
                Types.ROW([  # Policy
                    Types.STRING(),  # PolicyId
                    Types.INT(),  # AttachmentCount
                    Types.BOOLEAN()
                ])
            ])
        ])
    ])

    output_type = Types.ROW([
        Types.STRING(),  # type
        Types.STRING(),  # policyName
        Types.INT()  # noRolesAssignedTo
    ])

    kafka_properties = {
        'bootstrap.servers': KAFKA_BOOSTRAP,
        'group.id': 'test_group'
    }

    input_schema = JsonRowDeserializationSchema.builder().type_info(input_type).build()

    kafka_consumer = FlinkKafkaConsumer(topics='input_topic',
                                        deserialization_schema=input_schema,
                                        properties=kafka_properties)
    kafka_consumer.set_start_from_earliest()

    output_schema = JsonRowSerializationSchema.builder().with_type_info(output_type).build()
    kafka_producer = FlinkKafkaProducer(topic='output_topic',
                                        serialization_schema=output_schema,
                                        producer_config=kafka_properties)

    ds = env.add_source(kafka_consumer)
    ds.filter(has_too_many_permissions)\
        .flat_map(get_output, output_type=output_type)\
        .add_sink(kafka_producer)

    env.execute('simple_job')


if __name__ == "__main__":
    tutorial()

Attachment: test_event.json
Description: application/json

Reply via email to