Hey, Malcolm, Thanks for reporting this issue. Could you open a JIRA to track that?
Best! -Yi On Mon, Aug 29, 2022 at 5:53 PM Malcolm McFarland <mmcfarl...@cavulus.com> wrote: > Hey folks, > > I've recently been attempting to upgrade our legacy application from Samza > 1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the > application with this command: > > ./bin/run-app.sh --config-path=path/to/file.properties > > Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the > application is starting fully up without errors and then is simply shutting > down, once again without error. Afaict it runs fine on YARN. Does Samza > v1.6.0+ support running local processes? I've tried this on both OS X and > Ubuntu, using Java 1.8. > > Here are the relevant portions of the properties file: > > task.class=com.cavulus.task.SimpleLegacyTask > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > job.default.system=kafka > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > job.name=simple-legacy-task > task.inputs=kafka.event-input > > ...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few > lines of logging output: > > 2022-08-29 17:19:42,842 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Sending metadata request > (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null) > 2022-08-29 17:19:42,843 INFO [org.apache.kafka.clients.Metadata] > Cluster ID: fwnjhL2kQayFxN0xpatT-g > 2022-08-29 17:19:42,843 DEBUG [org.apache.kafka.clients.Metadata] > Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g, > nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller = > localhost:9092 (id: 0 rack: null)) > 2022-08-29 17:19:42,843 DEBUG > [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream > simple-legacy-task-broadcast-stream has partitions [Partition(topic = > simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas = > [0], isr = [0], offlineReplicas = [])] > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Initiating connection to node localhost:9092 > (id: 0 rack: null) > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] > Added sensor with name node-0.bytes-sent > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] > Added sensor with name node-0.bytes-received > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] > Added sensor with name node-0.latency > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.network.Selector] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972, > SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0 > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API > versions. > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Initiating API versions fetch from node 0. > 2022-08-29 17:19:42,845 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Recorded API versions for node 0: > (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8], > ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6], > LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], > UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 > [usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5 > [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5 > [usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2 > [usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3 > [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 > [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 > [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to > 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], > OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to > 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 > [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to > 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 > [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to > 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], > AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 > [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], > CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 > [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], > ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): > 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0, > UNKNOWN(44): 0) > 2022-08-29 17:19:42,846 DEBUG > [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream event-input has > partitions [Partition(topic = event-input, partition = 0, leader = 0, > replicas = [0], isr = [0], offlineReplicas = [])] > 2022-08-29 17:19:42,846 INFO > [org.apache.samza.system.kafka.KafkaSystemAdmin] SystemStream partition > counts for system kafka: {event-input=SystemStreamMetadata > [streamName=event-input, partitionMetadata={Partition > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, > newestOffset=null, upcomingOffset=null]}], > simple-legacy-task-broadcast-stream=SystemStreamMetadata > [streamName=simple-legacy-task-broadcast-stream, > partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata > [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]} > 2022-08-29 17:19:42,850 DEBUG > [org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge > job-coordinator kafka-event-input-partitionCount 0. > 2022-08-29 17:19:42,850 DEBUG > [org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge > job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0. > 2022-08-29 17:19:42,851 INFO > [org.apache.samza.zk.ScheduleAfterDebounceTime] Trying to cancel the > action: OnProcessorChange. > 2022-08-29 17:19:42,852 INFO > [org.apache.samza.zk.ScheduleAfterDebounceTime] Scheduled action: > OnProcessorChange to run after: 20000 milliseconds. > 2022-08-29 17:19:42,852 INFO [org.apache.samza.zk.ZkUtils] subscribing > for jm version change > > at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion > 2022-08-29 17:19:42,853 DEBUG [org.apache.zookeeper.ClientCnxn] Reading > reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null > finished:false header:: 14,3 replyHeader:: 14,265013,0 request:: > > '/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T > response:: > s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584} > 2022-08-29 17:19:42,853 DEBUG [org.I0Itec.zkclient.ZkClient] Subscribed > data changes for > > /app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion > > At which point the application silently exits. > > Thanks in advance for any advice, ideas, things to check, etc. > > Cheers, > Malcolm McFarland > Cavulus >