sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264475624
########## File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.I0Itec.zkclient.IZkStateListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.DistributedReadWriteLock; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed lock primitive for Zookeeper. + */ +public class ZkDistributedReadWriteLock implements DistributedReadWriteLock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedReadWriteLock.class); + private static final String PARTICIPANTS_PATH = "participants"; + private static final String PROCESSORS_PATH = "processors"; + private final ZkUtils zkUtils; + private final String lockPath; + private final String particpantsPath; + private final String processorsPath; + private final String participantId; + private final ZkKeyBuilder keyBuilder; + private final Random random = new Random(); + private String activeParticipantPath = null; + private String activeProcessorPath = null; + private Object mutex; + private Boolean isInCriticalSection = false; + private Boolean isStateLost = false; + + public ZkDistributedReadWriteLock(String participantId, ZkUtils zkUtils, String lockId) { + if (zkUtils == null) { + throw new RuntimeException("Cannot operate ZkDistributedReadWriteLock without ZkUtils."); + } + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = zkUtils.getKeyBuilder(); + lockPath = String.format("%s/readWriteLock-%s", keyBuilder.getRootPath(),lockId); + particpantsPath = String.format("%s/%s", lockPath, PARTICIPANTS_PATH); + processorsPath = String.format("%s/%s", lockPath, PROCESSORS_PATH); + zkUtils.validatePaths(new String[] {lockPath, particpantsPath, processorsPath}); + mutex = new Object(); + zkUtils.getZkClient().subscribeChildChanges(particpantsPath, new ParticipantChangeHandler(zkUtils)); + zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener()); + } + + /** + * Tries to acquire a lock in order to generate run.id. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node under "participants" to acquire the lock. + * If the path of this node has the lowest sequence number, + * it creates a sequential ephemeral node under "processors" and checks the number of nodes under "processors" + * if there is only one node under "processors", a WRITE access lock is acquired + * else a READ access lock is acquired + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return AccessType.READ/WRITE if lock is acquired successfully, AccessType.NONE if it times out. + */ + @Override + public AccessType lock(long timeout, TimeUnit unit) + throws TimeoutException { + + activeParticipantPath = zkUtils.getZkClient().createEphemeralSequential(particpantsPath + "/", participantId); + + //Start timer for timeout + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + + while((System.currentTimeMillis() - startTime) < lockTimeout) { + synchronized (mutex) { + AccessType accessType = checkAndAcquireLock(); + if(accessType != AccessType.NONE) { + isInCriticalSection = true; + if(isStateLost) { + throw new SamzaException("Lock's state lost due to connection expiry"); + } + return accessType; + } else { + if(isStateLost) { Review comment: can this be moved out of the if()? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services