Apache ZooKeeper Server Source Code

Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.

Apache ZooKeeper Server Source Code files are provided in the source package file, apache-zookeeper-3.8.0.tar.gz.

You can download apache-zookeeper-3.8.0.tar.gz as described in the previous tutorial and go to the "zookeeper-server" sub-folder to view Apache ZooKeeper Server Source Code files.

You can also browse Apache ZooKeeper Server Source Code below:

✍: FYIcenter.com

org/apache/zookeeper/server/quorum/QuorumPeer.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.security.sasl.SaslException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.common.AtomicFileWritingIdiom;
import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.admin.AdminServer;
import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
import org.apache.zookeeper.server.admin.AdminServerFactory;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class manages the quorum protocol. There are three states this server
 * can be in:
 * <ol>
 * <li>Leader election - each server will elect a leader (proposing itself as a
 * leader initially).</li>
 * <li>Follower - the server will synchronize with the leader and replicate any
 * transactions.</li>
 * <li>Leader - the server will process requests and forward them to followers.
 * A majority of followers must log the request before it can be accepted.
 * </ol>
 *
 * This class will setup a datagram socket that will always respond with its
 * view of the current leader. The response will take the form of:
 *
 * <pre>
 * int xid;
 *
 * long myid;
 *
 * long leader_id;
 *
 * long leader_zxid;
 * </pre>
 *
 * The request for the current leader will consist solely of an xid: int xid;
 */
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {

    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class);

    public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames";
    public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false";

    private QuorumBean jmxQuorumBean;
    LocalPeerBean jmxLocalPeerBean;
    private Map<Long, RemotePeerBean> jmxRemotePeerBean;
    LeaderElectionBean jmxLeaderElectionBean;

    // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
    // of updates; see the implementation comment at setLastSeenQuorumVerifier().
    private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();

    QuorumAuthServer authServer;
    QuorumAuthLearner authLearner;

    /**
     * ZKDatabase is a top level member of quorumpeer
     * which will be used in all the zookeeperservers
     * instantiated later. Also, it is created once on
     * bootup and only thrown away in case of a truncate
     * message from the leader
     */
    private ZKDatabase zkDb;

    private JvmPauseMonitor jvmPauseMonitor;

    public static final class AddressTuple {

        public final MultipleAddresses quorumAddr;
        public final MultipleAddresses electionAddr;
        public final InetSocketAddress clientAddr;

        public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
            this.quorumAddr = quorumAddr;
            this.electionAddr = electionAddr;
            this.clientAddr = clientAddr;
        }

    }

    private int observerMasterPort;

    public int getObserverMasterPort() {
        return observerMasterPort;
    }

    public void setObserverMasterPort(int observerMasterPort) {
        this.observerMasterPort = observerMasterPort;
    }

    public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled";
    public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false";

    private boolean multiAddressEnabled = true;
    public boolean isMultiAddressEnabled() {
        return multiAddressEnabled;
    }

    public void setMultiAddressEnabled(boolean multiAddressEnabled) {
        this.multiAddressEnabled = multiAddressEnabled;
        LOG.info("multiAddress.enabled set to {}", multiAddressEnabled);
    }

    public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs";

    private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis();
    public int getMultiAddressReachabilityCheckTimeoutMs() {
        return multiAddressReachabilityCheckTimeoutMs;
    }

    public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabilityCheckTimeoutMs) {
        this.multiAddressReachabilityCheckTimeoutMs = multiAddressReachabilityCheckTimeoutMs;
        LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs);
    }

    public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled";

    private boolean multiAddressReachabilityCheckEnabled = true;

    public boolean isMultiAddressReachabilityCheckEnabled() {
        return multiAddressReachabilityCheckEnabled;
    }

    public void setMultiAddressReachabilityCheckEnabled(boolean multiAddressReachabilityCheckEnabled) {
        this.multiAddressReachabilityCheckEnabled = multiAddressReachabilityCheckEnabled;
        LOG.info("multiAddress.reachabilityCheckEnabled set to {}", multiAddressReachabilityCheckEnabled);
    }

    public static class QuorumServer {

        public MultipleAddresses addr = new MultipleAddresses();

        public MultipleAddresses electionAddr = new MultipleAddresses();

        public InetSocketAddress clientAddr = null;

        public long id;

        public String hostname;

        public LearnerType type = LearnerType.PARTICIPANT;

        public boolean isClientAddrFromStatic = false;

        private List<InetSocketAddress> myAddrs;

        public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
            this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
        }

        public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
            this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
        }

        // VisibleForTesting
        public QuorumServer(long id, InetSocketAddress addr) {
            this(id, addr, null, null, LearnerType.PARTICIPANT);
        }

        public long getId() {
            return id;
        }

        /**
         * Performs a DNS lookup for server address and election address.
         *
         * If the DNS lookup fails, this.addr and electionAddr remain
         * unmodified.
         */
        public void recreateSocketAddresses() {
            if (this.addr.isEmpty()) {
                LOG.warn("Server address has not been initialized");
                return;
            }
            if (this.electionAddr.isEmpty()) {
                LOG.warn("Election address has not been initialized");
                return;
            }
            this.addr.recreateSocketAddresses();
            this.electionAddr.recreateSocketAddresses();
        }

        private LearnerType getType(String s) throws ConfigException {
            switch (s.trim().toLowerCase()) {
                case "observer":
                    return LearnerType.OBSERVER;
                case "participant":
                    return LearnerType.PARTICIPANT;
                default:
                    throw new ConfigException("Unrecognised peertype: " + s);
            }
        }

        public QuorumServer(long sid, String addressStr) throws ConfigException {
            this(sid, addressStr, QuorumServer::getInetAddress);
        }

        QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
            this.id = sid;
            initializeWithAddressString(addressStr, getInetAddress);
        }

        public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
            this(id, addr, electionAddr, null, type);
        }

        public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
            this.id = id;
            if (addr != null) {
                this.addr.addAddress(addr);
            }
            if (electionAddr != null) {
                this.electionAddr.addAddress(electionAddr);
            }
            this.type = type;
            this.clientAddr = clientAddr;

            setMyAddrs();
        }

        private static final String wrongFormat =
            " does not have the form server_config or server_config;client_config"
            + " where server_config is the pipe separated list of host:port:port or host:port:port:type"
            + " and client_config is port or host:port";

        private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
            LearnerType newType = null;
            String[] serverClientParts = addressStr.split(";");
            String[] serverAddresses = serverClientParts[0].split("\\|");

            if (serverClientParts.length == 2) {
                String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
                if (clientParts.length > 2) {
                    throw new ConfigException(addressStr + wrongFormat);
                }

                // is client_config a host:port or just a port
                String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
                try {
                    clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
                } catch (NumberFormatException e) {
                    throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
                }
            }

            boolean multiAddressEnabled = Boolean.parseBoolean(
                System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
            if (!multiAddressEnabled && serverAddresses.length > 1) {
                throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id);
            }

            boolean canonicalize = Boolean.parseBoolean(
                System.getProperty(
                    CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES,
                    CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));

            for (String serverAddress : serverAddresses) {
                String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
                if ((serverClientParts.length > 2) || (serverParts.length < 3)
                        || (serverParts.length > 4)) {
                    throw new ConfigException(addressStr + wrongFormat);
                }

                String serverHostName = serverParts[0];

                // server_config should be either host:port:port or host:port:port:type
                InetSocketAddress tempAddress;
                InetSocketAddress tempElectionAddress;
                try {
                    tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1]));
                    addr.addAddress(tempAddress);
                } catch (NumberFormatException e) {
                    throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]);
                }
                try {
                    tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2]));
                    electionAddr.addAddress(tempElectionAddress);
                } catch (NumberFormatException e) {
                    throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]);
                }

                if (tempAddress.getPort() == tempElectionAddress.getPort()) {
                    throw new ConfigException("Client and election port must be different! Please update the "
                            + "configuration file on server." + this.id);
                }

                if (canonicalize) {
                    InetAddress ia = getInetAddress.apply(tempAddress);
                    if (ia == null) {
                        throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable");
                    }

                    String canonicalHostName = ia.getCanonicalHostName();

                    if (!canonicalHostName.equals(serverHostName)
                        // Avoid using literal IP address when
                        // security check fails
                        && !canonicalHostName.equals(ia.getHostAddress())) {
                        LOG.info("Host name for quorum server {} "
                            + "canonicalized from {} to {}",
                            this.id, serverHostName, canonicalHostName);
                        serverHostName = canonicalHostName;
                    }
                }

                if (serverParts.length == 4) {
                    LearnerType tempType = getType(serverParts[3]);
                    if (newType == null) {
                        newType = tempType;
                    }

                    if (newType != tempType) {
                        throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType);
                    }
                }

                this.hostname = serverHostName;
            }

            if (newType != null) {
                type = newType;
            }

            setMyAddrs();
        }

        private static InetAddress getInetAddress(InetSocketAddress addr) {
            return addr.getAddress();
        }

        private void setMyAddrs() {
            this.myAddrs = new ArrayList<>();
            this.myAddrs.addAll(this.addr.getAllAddresses());
            this.myAddrs.add(this.clientAddr);
            this.myAddrs.addAll(this.electionAddr.getAllAddresses());
            this.myAddrs = excludedSpecialAddresses(this.myAddrs);
        }

        public static String delimitedHostString(InetSocketAddress addr) {
            String host = addr.getHostString();
            if (host.contains(":")) {
                return "[" + host + "]";
            } else {
                return host;
            }
        }

        public String toString() {
            StringWriter sw = new StringWriter();

            List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses());
            List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses());

            if (addrList.size() > 0 && electionAddrList.size() > 0) {
                addrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
                electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
                sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d",
                        delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort()))
                        .collect(Collectors.joining("|")));
            }

            if (type == LearnerType.OBSERVER) {
                sw.append(":observer");
            } else if (type == LearnerType.PARTICIPANT) {
                sw.append(":participant");
            }

            if (clientAddr != null && !isClientAddrFromStatic) {
                sw.append(";");
                sw.append(delimitedHostString(clientAddr));
                sw.append(":");
                sw.append(String.valueOf(clientAddr.getPort()));
            }

            return sw.toString();
        }

        public int hashCode() {
            assert false : "hashCode not designed";
            return 42; // any arbitrary constant will do
        }

        private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
            return (addr1 != null || addr2 == null)
                   && (addr1 == null || addr2 != null)
                   && (addr1 == null || addr2 == null || addr1.equals(addr2));
        }

        public boolean equals(Object o) {
            if (!(o instanceof QuorumServer)) {
                return false;
            }
            QuorumServer qs = (QuorumServer) o;
            if ((qs.id != id) || (qs.type != type)) {
                return false;
            }
            if (!addr.equals(qs.addr)) {
                return false;
            }
            if (!electionAddr.equals(qs.electionAddr)) {
                return false;
            }
            return checkAddressesEqual(clientAddr, qs.clientAddr);
        }

        public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
            List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
            otherAddrs.add(s.clientAddr);
            otherAddrs.addAll(s.electionAddr.getAllAddresses());
            otherAddrs = excludedSpecialAddresses(otherAddrs);

            for (InetSocketAddress my : this.myAddrs) {

                for (InetSocketAddress other : otherAddrs) {
                    if (my.equals(other)) {
                        String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id);
                        throw new BadArgumentsException(error);
                    }
                }
            }
        }

        private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) {
            List<InetSocketAddress> included = new ArrayList<>();

            for (InetSocketAddress addr : addrs) {
                if (addr == null) {
                    continue;
                }
                InetAddress inetaddr = addr.getAddress();

                if (inetaddr == null || inetaddr.isAnyLocalAddress() // wildCard addresses (0.0.0.0 or [::])
                    || inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1)
                    continue;
                }
                included.add(addr);
            }
            return included;
        }

    }

    public enum ServerState {
        LOOKING,
        FOLLOWING,
        LEADING,
        OBSERVING
    }

    /**
     * (Used for monitoring) shows the current phase of
     * Zab protocol that peer is running.
     */
    public enum ZabState {
        ELECTION,
        DISCOVERY,
        SYNCHRONIZATION,
        BROADCAST
    }

    /**
     * (Used for monitoring) When peer is in synchronization phase, this shows
     * which synchronization mechanism is being used
     */
    public enum SyncMode {
        NONE,
        DIFF,
        SNAP,
        TRUNC
    }

    /*
     * A peer can either be participating, which implies that it is willing to
     * both vote in instances of consensus and to elect or become a Leader, or
     * it may be observing in which case it isn't.
     *
     * We need this distinction to decide which ServerState to move to when
     * conditions change (e.g. which state to become after LOOKING).
     */
    public enum LearnerType {
        PARTICIPANT,
        OBSERVER
    }

    /*
     * To enable observers to have no identifier, we need a generic identifier
     * at least for QuorumCnxManager. We use the following constant to as the
     * value of such a generic identifier.
     */

    static final long OBSERVER_ID = Long.MAX_VALUE;

    /*
     * Record leader election time
     */
    public long start_fle, end_fle; // fle = fast leader election
    public static final String FLE_TIME_UNIT = "MS";
    private long unavailableStartTime;

    /*
     * Default value of peer is participant
     */
    private LearnerType learnerType = LearnerType.PARTICIPANT;

    public LearnerType getLearnerType() {
        return learnerType;
    }

    /**
     * Sets the LearnerType
     */
    public void setLearnerType(LearnerType p) {
        learnerType = p;
    }

    protected synchronized void setConfigFileName(String s) {
        configFilename = s;
    }

    private String configFilename = null;

    public int getQuorumSize() {
        return getVotingView().size();
    }

    public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
        this.jvmPauseMonitor = jvmPauseMonitor;
    }

    /**
     * QuorumVerifier implementation; default (majority).
     */

    //last committed quorum verifier
    private QuorumVerifier quorumVerifier;

    //last proposed quorum verifier
    private QuorumVerifier lastSeenQuorumVerifier = null;

    // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
    final Object QV_LOCK = new Object();

    /**
     * My id
     */
    private long myid;

    /**
     * get the id of this quorum peer.
     */
    public long getId() {
        return myid;
    }

    // VisibleForTesting
    void setId(long id) {
        this.myid = id;
    }

    private boolean sslQuorum;
    private boolean shouldUsePortUnification;

    public boolean isSslQuorum() {
        return sslQuorum;
    }

    public boolean shouldUsePortUnification() {
        return shouldUsePortUnification;
    }

    private final QuorumX509Util x509Util;

    QuorumX509Util getX509Util() {
        return x509Util;
    }

    /**
     * This is who I think the leader currently is.
     */
    private volatile Vote currentVote;

    public synchronized Vote getCurrentVote() {
        return currentVote;
    }

    public synchronized void setCurrentVote(Vote v) {
        currentVote = v;
    }

    private volatile boolean running = true;

    private String initialConfig;

    /**
     * The number of milliseconds of each tick
     */
    protected int tickTime;

    /**
     * Whether learners in this quorum should create new sessions as local.
     * False by default to preserve existing behavior.
     */
    protected boolean localSessionsEnabled = false;

    /**
     * Whether learners in this quorum should upgrade local sessions to
     * global. Only matters if local sessions are enabled.
     */
    protected boolean localSessionsUpgradingEnabled = true;

    /**
     * Minimum number of milliseconds to allow for session timeout.
     * A value of -1 indicates unset, use default.
     */
    protected int minSessionTimeout = -1;

    /**
     * Maximum number of milliseconds to allow for session timeout.
     * A value of -1 indicates unset, use default.
     */
    protected int maxSessionTimeout = -1;

    /**
     * The ZooKeeper server's socket backlog length. The number of connections
     * that will be queued to be read before new connections are dropped. A
     * value of one indicates the default backlog will be used.
     */
    protected int clientPortListenBacklog = -1;

    /**
     * The number of ticks that the initial synchronization phase can take
     */
    protected volatile int initLimit;

    /**
     * The number of ticks that can pass between sending a request and getting
     * an acknowledgment
     */
    protected volatile int syncLimit;

    /**
     * The number of ticks that can pass before retrying to connect to learner master
     */
    protected volatile int connectToLearnerMasterLimit;

    /**
     * Enables/Disables sync request processor. This option is enabled
     * by default and is to be used with observers.
     */
    protected boolean syncEnabled = true;

    /**
     * The current tick
     */
    protected AtomicInteger tick = new AtomicInteger();

    /**
     * Whether or not to listen on all IPs for the two quorum ports
     * (broadcast and fast leader election).
     */
    protected boolean quorumListenOnAllIPs = false;

    /**
     * Keeps time taken for leader election in milliseconds. Sets the value to
     * this variable only after the completion of leader election.
     */
    private long electionTimeTaken = -1;

    /**
     * Enable/Disables quorum authentication using sasl. Defaulting to false.
     */
    protected boolean quorumSaslEnableAuth;

    /**
     * If this is false, quorum peer server will accept another quorum peer client
     * connection even if the authentication did not succeed. This can be used while
     * upgrading ZooKeeper server. Defaulting to false (required).
     */
    protected boolean quorumServerSaslAuthRequired;

    /**
     * If this is false, quorum peer learner will talk to quorum peer server
     * without authentication. This can be used while upgrading ZooKeeper
     * server. Defaulting to false (required).
     */
    protected boolean quorumLearnerSaslAuthRequired;

    /**
     * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
     */
    protected String quorumServicePrincipal;

    /**
     * Quorum learner login context name in jaas-conf file to read the kerberos
     * security details. Defaulting to 'QuorumLearner'.
     */
    protected String quorumLearnerLoginContext;

    /**
     * Quorum server login context name in jaas-conf file to read the kerberos
     * security details. Defaulting to 'QuorumServer'.
     */
    protected String quorumServerLoginContext;

    // TODO: need to tune the default value of thread size
    private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
    /**
     * The maximum number of threads to allow in the connectionExecutors thread
     * pool which will be used to initiate quorum server connections.
     */
    protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;

    public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
    private static int quorumCnxnTimeoutMs;

    static {
        quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1);
        LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs);
    }

    /**
     * @deprecated As of release 3.4.0, this class has been deprecated, since
     * it is used with one of the udp-based versions of leader election, which
     * we are also deprecating.
     *
     * This class simply responds to requests for the current leader of this
     * node.
     * <p>
     * The request contains just an xid generated by the requestor.
     * <p>
     * The response has the xid, the id of this server, the id of the leader,
     * and the zxid of the leader.
     *
     *
     */
    @Deprecated
    class ResponderThread extends ZooKeeperThread {

        ResponderThread() {
            super("ResponderThread");
        }

        volatile boolean running = true;

        @Override
        public void run() {
            try {
                byte[] b = new byte[36];
                ByteBuffer responseBuffer = ByteBuffer.wrap(b);
                DatagramPacket packet = new DatagramPacket(b, b.length);
                while (running) {
                    udpSocket.receive(packet);
                    if (packet.getLength() != 4) {
                        LOG.warn("Got more than just an xid! Len = {}", packet.getLength());
                    } else {
                        responseBuffer.clear();
                        responseBuffer.getInt(); // Skip the xid
                        responseBuffer.putLong(myid);
                        Vote current = getCurrentVote();
                        switch (getPeerState()) {
                        case LOOKING:
                            responseBuffer.putLong(current.getId());
                            responseBuffer.putLong(current.getZxid());
                            break;
                        case LEADING:
                            responseBuffer.putLong(myid);
                            try {
                                long proposed;
                                synchronized (leader) {
                                    proposed = leader.lastProposed;
                                }
                                responseBuffer.putLong(proposed);
                            } catch (NullPointerException npe) {
                                // This can happen in state transitions,
                                // just ignore the request
                            }
                            break;
                        case FOLLOWING:
                            responseBuffer.putLong(current.getId());
                            try {
                                responseBuffer.putLong(follower.getZxid());
                            } catch (NullPointerException npe) {
                                // This can happen in state transitions,
                                // just ignore the request
                            }
                            break;
                        case OBSERVING:
                            // Do nothing, Observers keep themselves to
                            // themselves.
                            break;
                        }
                        packet.setData(b);
                        udpSocket.send(packet);
                    }
                    packet.setLength(b.length);
                }
            } catch (RuntimeException e) {
                LOG.warn("Unexpected runtime exception in ResponderThread", e);
            } catch (IOException e) {
                LOG.warn("Unexpected IO exception in ResponderThread", e);
            } finally {
                LOG.warn("QuorumPeer responder thread exited");
            }
        }

    }

    private ServerState state = ServerState.LOOKING;

    private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
    private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
    private AtomicReference<String> leaderAddress = new AtomicReference<String>("");
    private AtomicLong leaderId = new AtomicLong(-1);

    private boolean reconfigFlag = false; // indicates that a reconfig just committed

    public synchronized void setPeerState(ServerState newState) {
        state = newState;
        if (newState == ServerState.LOOKING) {
            setLeaderAddressAndId(null, -1);
            setZabState(ZabState.ELECTION);
        } else {
            LOG.info("Peer state changed: {}", getDetailedPeerState());
        }
    }

    public void setZabState(ZabState zabState) {
        if ((zabState == ZabState.BROADCAST) && (unavailableStartTime != 0)) {
            long unavailableTime = Time.currentElapsedTime() - unavailableStartTime;
            ServerMetrics.getMetrics().UNAVAILABLE_TIME.add(unavailableTime);
            if (getPeerState() == ServerState.LEADING) {
                ServerMetrics.getMetrics().LEADER_UNAVAILABLE_TIME.add(unavailableTime);
            }
            unavailableStartTime = 0;
        }
        this.zabState.set(zabState);
        LOG.info("Peer state changed: {}", getDetailedPeerState());
    }

    public void setSyncMode(SyncMode syncMode) {
        this.syncMode.set(syncMode);
        LOG.info("Peer state changed: {}", getDetailedPeerState());
    }

    public ZabState getZabState() {
        return zabState.get();
    }

    public SyncMode getSyncMode() {
        return syncMode.get();
    }

    public void setLeaderAddressAndId(MultipleAddresses addr, long newId) {
        if (addr != null) {
            leaderAddress.set(String.join("|", addr.getAllHostStrings()));
        } else {
            leaderAddress.set(null);
        }
        leaderId.set(newId);
    }

    public String getLeaderAddress() {
        return leaderAddress.get();
    }

    public long getLeaderId() {
        return leaderId.get();
    }

    public String getDetailedPeerState() {
        final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
        final ZabState zabState = getZabState();
        if (!ZabState.ELECTION.equals(zabState)) {
            sb.append(" - ").append(zabState.toString().toLowerCase());
        }
        final SyncMode syncMode = getSyncMode();
        if (!SyncMode.NONE.equals(syncMode)) {
            sb.append(" - ").append(syncMode.toString().toLowerCase());
        }
        return sb.toString();
    }

    public synchronized void reconfigFlagSet() {
        reconfigFlag = true;
    }
    public synchronized void reconfigFlagClear() {
        reconfigFlag = false;
    }
    public synchronized boolean isReconfigStateChange() {
        return reconfigFlag;
    }
    public synchronized ServerState getPeerState() {
        return state;
    }

    DatagramSocket udpSocket;

    private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();

    /**
     * Resolves hostname for a given server ID.
     *
     * This method resolves hostname for a given server ID in both quorumVerifer
     * and lastSeenQuorumVerifier. If the server ID matches the local server ID,
     * it also updates myAddrs.
     */
    public void recreateSocketAddresses(long id) {
        QuorumVerifier qv = getQuorumVerifier();
        if (qv != null) {
            QuorumServer qs = qv.getAllMembers().get(id);
            if (qs != null) {
                qs.recreateSocketAddresses();
                if (id == getId()) {
                    setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
                }
            }
        }
        qv = getLastSeenQuorumVerifier();
        if (qv != null) {
            QuorumServer qs = qv.getAllMembers().get(id);
            if (qs != null) {
                qs.recreateSocketAddresses();
            }
        }
    }

    private AddressTuple getAddrs() {
        AddressTuple addrs = myAddrs.get();
        if (addrs != null) {
            return addrs;
        }
        try {
            synchronized (QV_LOCK) {
                addrs = myAddrs.get();
                while (addrs == null) {
                    QV_LOCK.wait();
                    addrs = myAddrs.get();
                }
                return addrs;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    public MultipleAddresses getQuorumAddress() {
        return getAddrs().quorumAddr;
    }

    public MultipleAddresses getElectionAddress() {
        return getAddrs().electionAddr;
    }

    public InetSocketAddress getClientAddress() {
        final AddressTuple addrs = myAddrs.get();
        return (addrs == null) ? null : addrs.clientAddr;
    }

    private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
        synchronized (QV_LOCK) {
            myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
            QV_LOCK.notifyAll();
        }
    }

    private int electionType;

    Election electionAlg;

    ServerCnxnFactory cnxnFactory;
    ServerCnxnFactory secureCnxnFactory;

    private FileTxnSnapLog logFactory = null;

    private final QuorumStats quorumStats;

    AdminServer adminServer;

    private final boolean reconfigEnabled;

    public static QuorumPeer testingQuorumPeer() throws SaslException {
        return new QuorumPeer();
    }

    public QuorumPeer() throws SaslException {
        super("QuorumPeer");
        quorumStats = new QuorumStats(this);
        jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
        adminServer = AdminServerFactory.createAdminServer();
        x509Util = createX509Util();
        initialize();
        reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
    }

    // VisibleForTesting
    QuorumX509Util createX509Util() {
        return new QuorumX509Util();
    }

    /**
     * For backward compatibility purposes, we instantiate QuorumMaj by default.
     */

    public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException {
        this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers));
    }

    public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
        this();
        this.cnxnFactory = cnxnFactory;
        this.electionType = electionType;
        this.myid = myid;
        this.tickTime = tickTime;
        this.initLimit = initLimit;
        this.syncLimit = syncLimit;
        this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
        this.quorumListenOnAllIPs = quorumListenOnAllIPs;
        this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
        this.zkDb = new ZKDatabase(this.logFactory);
        if (quorumConfig == null) {
            quorumConfig = new QuorumMaj(quorumPeers);
        }
        setQuorumVerifier(quorumConfig, false);
        adminServer = AdminServerFactory.createAdminServer();
    }

    public void initialize() throws SaslException {
        // init quorum auth server & learner
        if (isQuorumSaslAuthEnabled()) {
            Set<String> authzHosts = new HashSet<String>();
            for (QuorumServer qs : getView().values()) {
                authzHosts.add(qs.hostname);
            }
            authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
            authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
        } else {
            authServer = new NullQuorumAuthServer();
            authLearner = new NullQuorumAuthLearner();
        }
    }

    QuorumStats quorumStats() {
        return quorumStats;
    }

    @Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
        }
        startLeaderElection();
        startJvmPauseMonitor();
        super.start();
    }

    private void loadDataBase() {
        try {
            zkDb.loadDataBase();

            // load the epochs
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
            } catch (FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                currentEpoch = epochOfZxid;
                LOG.info(
                    "{} not found! Creating with a reasonable default of {}. "
                        + "This should only happen when you are upgrading your installation",
                    CURRENT_EPOCH_FILENAME,
                    currentEpoch);
                writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
            }
            if (epochOfZxid > currentEpoch) {
                // acceptedEpoch.tmp file in snapshot directory
                File currentTmp = new File(getTxnFactory().getSnapDir(),
                    CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
                if (currentTmp.exists()) {
                    long epochOfTmp = readLongFromFile(currentTmp.getName());
                    LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
                    setCurrentEpoch(epochOfTmp);
                } else {
                    throw new IOException(
                        "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
                            + ", is older than the last zxid, " + lastProcessedZxid);
                }
            }
            try {
                acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
            } catch (FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                acceptedEpoch = epochOfZxid;
                LOG.info(
                    "{} not found! Creating with a reasonable default of {}. "
                        + "This should only happen when you are upgrading your installation",
                    ACCEPTED_EPOCH_FILENAME,
                    acceptedEpoch);
                writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
            }
            if (acceptedEpoch < currentEpoch) {
                throw new IOException("The accepted epoch, "
                                      + ZxidUtils.zxidToString(acceptedEpoch)
                                      + " is less than the current epoch, "
                                      + ZxidUtils.zxidToString(currentEpoch));
            }
        } catch (IOException ie) {
            LOG.error("Unable to load database on disk", ie);
            throw new RuntimeException("Unable to run quorum server ", ie);
        }
    }

    ResponderThread responder;

    public synchronized void stopLeaderElection() {
        responder.running = false;
        responder.interrupt();
    }
    public synchronized void startLeaderElection() {
        try {
            if (getPeerState() == ServerState.LOOKING) {
                currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
            }
        } catch (IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }

        this.electionAlg = createElectionAlgorithm(electionType);
    }

    private void startJvmPauseMonitor() {
        if (this.jvmPauseMonitor != null) {
            this.jvmPauseMonitor.serviceStart();
        }
    }

    /**
     * Count the number of nodes in the map that could be followers.
     * @param peers
     * @return The number of followers in the map
     */
    protected static int countParticipants(Map<Long, QuorumServer> peers) {
        int count = 0;
        for (QuorumServer q : peers.values()) {
            if (q.type == LearnerType.PARTICIPANT) {
                count++;
            }
        }
        return count;
    }

    /**
     * This constructor is only used by the existing unit test code.
     * It defaults to FileLogProvider persistence provider.
     */
    public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException {
        this(
            quorumPeers,
            snapDir,
            logDir,
            electionAlg,
            myid,
            tickTime,
            initLimit,
            syncLimit,
            connectToLearnerMasterLimit,
            false,
            ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
            new QuorumMaj(quorumPeers));
    }

    public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException {
        this(
                quorumPeers,
                snapDir,
                logDir,
                electionAlg,
                myid,
                tickTime,
                initLimit,
                syncLimit,
                connectToLearnerMasterLimit,
                false,
                ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
                new QuorumOracleMaj(quorumPeers, oraclePath));
    }

    /**
     * This constructor is only used by the existing unit test code.
     * It defaults to FileLogProvider persistence provider.
     */
    public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException {
        this(
            quorumPeers,
            snapDir,
            logDir,
            electionAlg,
            myid,
            tickTime,
            initLimit,
            syncLimit,
            connectToLearnerMasterLimit,
            false,
            ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
            quorumConfig);
    }

    private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
        QuorumServer quorumServer = quorumPeers.get(myid);
        if (null == quorumServer) {
            throw new IOException("No QuorumServer correspoding to myid " + myid);
        }
        if (null == quorumServer.clientAddr) {
            return new InetSocketAddress(clientPort);
        }
        if (quorumServer.clientAddr.getPort() != clientPort) {
            throw new IOException("QuorumServer port "
                                  + quorumServer.clientAddr.getPort()
                                  + " does not match with given port "
                                  + clientPort);
        }
        return quorumServer.clientAddr;
    }

    /**
     * returns the highest zxid that this host has seen
     *
     * @return the highest zxid for this host
     */
    public long getLastLoggedZxid() {
        if (!zkDb.isInitialized()) {
            loadDataBase();
        }
        return zkDb.getDataTreeLastProcessedZxid();
    }

    public Follower follower;
    public Leader leader;
    public Observer observer;

    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
        return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
    }

    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
        return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
    }

    protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
        return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
    }

    @SuppressWarnings("deprecation")
    protected Election createElectionAlgorithm(int electionAlgorithm) {
        Election le = null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 1:
            throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
        case 2:
            throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
        case 3:
            QuorumCnxManager qcm = createCnxnManager();
            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
            if (oldQcm != null) {
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();
            }
            QuorumCnxManager.Listener listener = qcm.listener;
            if (listener != null) {
                listener.start();
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

    @SuppressWarnings("deprecation")
    protected Election makeLEStrategy() {
        LOG.debug("Initializing leader election protocol...");
        return electionAlg;
    }

    protected synchronized void setLeader(Leader newLeader) {
        leader = newLeader;
    }

    protected synchronized void setFollower(Follower newFollower) {
        follower = newFollower;
    }

    protected synchronized void setObserver(Observer newObserver) {
        observer = newObserver;
    }

    public synchronized ZooKeeperServer getActiveServer() {
        if (leader != null) {
            return leader.zk;
        } else if (follower != null) {
            return follower.zk;
        } else if (observer != null) {
            return observer.zk;
        }
        return null;
    }

    boolean shuttingDownLE = false;

    @Override
    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for (QuorumServer s : getView().values()) {
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                if (unavailableStartTime == 0) {
                    unavailableStartTime = Time.currentElapsedTime();
                }

                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");
                    ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                    } finally {
                        observer.shutdown();
                        setObserver(null);
                        updateServerState();

                        // Add delay jitter before we switch to LOOKING
                        // state to reduce the load of ObserverMaster
                        if (isRunning()) {
                            Observer.waitForObserverElectionDelay();
                        }
                    }
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        updateServerState();
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            MBeanRegistry instance = MBeanRegistry.getInstance();
            instance.unregister(jmxQuorumBean);
            instance.unregister(jmxLocalPeerBean);

            for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                instance.unregister(remotePeerBean);
            }

            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
            jmxRemotePeerBean = null;
        }
    }

    private synchronized void updateServerState() {
        if (!reconfigFlag) {
            setPeerState(ServerState.LOOKING);
            LOG.warn("PeerState set to LOOKING");
            return;
        }

        if (getId() == getCurrentVote().getId()) {
            setPeerState(ServerState.LEADING);
            LOG.debug("PeerState set to LEADING");
        } else if (getLearnerType() == LearnerType.PARTICIPANT) {
            setPeerState(ServerState.FOLLOWING);
            LOG.debug("PeerState set to FOLLOWING");
        } else if (getLearnerType() == LearnerType.OBSERVER) {
            setPeerState(ServerState.OBSERVING);
            LOG.debug("PeerState set to OBSERVER");
        } else { // currently shouldn't happen since there are only 2 learner types
            setPeerState(ServerState.LOOKING);
            LOG.debug("Should not be here");
        }
        reconfigFlag = false;
    }

    public void shutdown() {
        running = false;
        x509Util.close();
        if (leader != null) {
            leader.shutdown("quorum Peer shutdown");
        }
        if (follower != null) {
            follower.shutdown();
        }
        shutdownServerCnxnFactory();
        if (udpSocket != null) {
            udpSocket.close();
        }
        if (jvmPauseMonitor != null) {
            jvmPauseMonitor.serviceStop();
        }

        try {
            adminServer.shutdown();
        } catch (AdminServerException e) {
            LOG.warn("Problem stopping AdminServer", e);
        }

        if (getElectionAlg() != null) {
            this.interrupt();
            getElectionAlg().shutdown();
        }
        try {
            zkDb.close();
        } catch (IOException ie) {
            LOG.warn("Error closing logs ", ie);
        }
    }

    /**
     * A 'view' is a node's current opinion of the membership of the entire
     * ensemble.
     */
    public Map<Long, QuorumPeer.QuorumServer> getView() {
        return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
    }

    /**
     * Observers are not contained in this view, only nodes with
     * PeerType=PARTICIPANT.
     */
    public Map<Long, QuorumPeer.QuorumServer> getVotingView() {
        return getQuorumVerifier().getVotingMembers();
    }

    /**
     * Returns only observers, no followers.
     */
    public Map<Long, QuorumPeer.QuorumServer> getObservingView() {
        return getQuorumVerifier().getObservingMembers();
    }

    public synchronized Set<Long> getCurrentAndNextConfigVoters() {
        Set<Long> voterIds = new HashSet<Long>(getQuorumVerifier().getVotingMembers().keySet());
        if (getLastSeenQuorumVerifier() != null) {
            voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet());
        }
        return voterIds;
    }

    /**
     * Check if a node is in the current view. With static membership, the
     * result of this check will never change; only when dynamic membership
     * is introduced will this be more useful.
     */
    public boolean viewContains(Long sid) {
        return this.getView().containsKey(sid);
    }

    /**
     * Only used by QuorumStats at the moment
     */
    public String[] getQuorumPeers() {
        List<String> l = new ArrayList<String>();
        synchronized (this) {
            if (leader != null) {
                for (LearnerHandler fh : leader.getLearners()) {
                    if (fh.getSocket() != null) {
                        String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress());
                        if (leader.isLearnerSynced(fh)) {
                            s += "*";
                        }
                        l.add(s);
                    }
                }
            } else if (follower != null) {
                l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress()));
            }
        }
        return l.toArray(new String[0]);
    }

    public String getServerState() {
        switch (getPeerState()) {
        case LOOKING:
            return QuorumStats.Provider.LOOKING_STATE;
        case LEADING:
            return QuorumStats.Provider.LEADING_STATE;
        case FOLLOWING:
            return QuorumStats.Provider.FOLLOWING_STATE;
        case OBSERVING:
            return QuorumStats.Provider.OBSERVING_STATE;
        }
        return QuorumStats.Provider.UNKNOWN_STATE;
    }

    /**
     * set the id of this quorum peer.
     */
    public void setMyid(long myid) {
        this.myid = myid;
    }

    public void setInitialConfig(String initialConfig) {
        this.initialConfig = initialConfig;
    }

    public String getInitialConfig() {
        return initialConfig;
    }

    /**
     * Get the number of milliseconds of each tick
     */
    public int getTickTime() {
        return tickTime;
    }

    /**
     * Set the number of milliseconds of each tick
     */
    public void setTickTime(int tickTime) {
        LOG.info("tickTime set to {}", tickTime);
        this.tickTime = tickTime;
    }

    /** Maximum number of connections allowed from particular host (ip) */
    public int getMaxClientCnxnsPerHost() {
        if (cnxnFactory != null) {
            return cnxnFactory.getMaxClientCnxnsPerHost();
        }
        if (secureCnxnFactory != null) {
            return secureCnxnFactory.getMaxClientCnxnsPerHost();
        }
        return -1;
    }

    /** Whether local sessions are enabled */
    public boolean areLocalSessionsEnabled() {
        return localSessionsEnabled;
    }

    /** Whether to enable local sessions */
    public void enableLocalSessions(boolean flag) {
        LOG.info("Local sessions {}", (flag ? "enabled" : "disabled"));
        localSessionsEnabled = flag;
    }

    /** Whether local sessions are allowed to upgrade to global sessions */
    public boolean isLocalSessionsUpgradingEnabled() {
        return localSessionsUpgradingEnabled;
    }

    /** Whether to allow local sessions to upgrade to global sessions */
    public void enableLocalSessionsUpgrading(boolean flag) {
        LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled"));
        localSessionsUpgradingEnabled = flag;
    }

    /** minimum session timeout in milliseconds */
    public int getMinSessionTimeout() {
        return minSessionTimeout;
    }

    /** minimum session timeout in milliseconds */
    public void setMinSessionTimeout(int min) {
        LOG.info("minSessionTimeout set to {}", min);
        this.minSessionTimeout = min;
    }

    /** maximum session timeout in milliseconds */
    public int getMaxSessionTimeout() {
        return maxSessionTimeout;
    }

    /** maximum session timeout in milliseconds */
    public void setMaxSessionTimeout(int max) {
        LOG.info("maxSessionTimeout set to {}", max);
        this.maxSessionTimeout = max;
    }

    /** The server socket's listen backlog length */
    public int getClientPortListenBacklog() {
        return this.clientPortListenBacklog;
    }

    /** Sets the server socket's listen backlog length. */
    public void setClientPortListenBacklog(int backlog) {
        this.clientPortListenBacklog = backlog;
    }

    /**
     * Get the number of ticks that the initial synchronization phase can take
     */
    public int getInitLimit() {
        return initLimit;
    }

    /**
     * Set the number of ticks that the initial synchronization phase can take
     */
    public void setInitLimit(int initLimit) {
        LOG.info("initLimit set to {}", initLimit);
        this.initLimit = initLimit;
    }

    /**
     * Get the current tick
     */
    public int getTick() {
        return tick.get();
    }

    public QuorumVerifier configFromString(String s) throws IOException, ConfigException {
        Properties props = new Properties();
        props.load(new StringReader(s));
        return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath());
    }

    /**
     * Return QuorumVerifier object for the last committed configuration.
     */
    public QuorumVerifier getQuorumVerifier() {
        synchronized (QV_LOCK) {
            return quorumVerifier;
        }
    }

    /**
     * Return QuorumVerifier object for the last proposed configuration.
     */
    public QuorumVerifier getLastSeenQuorumVerifier() {
        synchronized (QV_LOCK) {
            return lastSeenQuorumVerifier;
        }
    }

    public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) {
        if (qvOLD == null || !qvOLD.equals(qvNEW)) {
            LOG.warn("Restarting Leader Election");
            getElectionAlg().shutdown();
            shuttingDownLE = false;
            startLeaderElection();
        }
    }

    public String getNextDynamicConfigFilename() {
        if (configFilename == null) {
            LOG.warn("configFilename is null! This should only happen in tests.");
            return null;
        }
        return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
    }

    // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
    // must be held.  We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
    // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take
    // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
    // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
    private void connectNewPeers(QuorumCnxManager qcm) {
        if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
            Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
            for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
                if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) {
                    qcm.connectOne(e.getKey());
                }
            }
        }
    }

    public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
        if (!isReconfigEnabled()) {
            LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
            return;
        }

        // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
        // and then take QV_LOCK.  Take the locks in the same order to ensure that we don't
        // deadlock against other callers of connectOne().  If qcmRef gets set in another
        // thread while we're inside the synchronized block, that does no harm; if we didn't
        // take a lock on qcm (because it was null when we sampled it), we won't call
        // connectOne() on it.  (Use of an AtomicReference is enough to guarantee visibility
        // of updates that provably happen in another thread before entering this method.)
        QuorumCnxManager qcm = qcmRef.get();
        Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
        synchronized (outerLockObject) {
            synchronized (QV_LOCK) {
                if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
                    LOG.error("setLastSeenQuorumVerifier called with stale config "
                              + qv.getVersion()
                              + ". Current version: "
                              + quorumVerifier.getVersion());
                }
                // assuming that a version uniquely identifies a configuration, so if
                // version is the same, nothing to do here.
                if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
                    return;
                }
                lastSeenQuorumVerifier = qv;
                if (qcm != null) {
                    connectNewPeers(qcm);
                }

                if (writeToDisk) {
                    try {
                        String fileName = getNextDynamicConfigFilename();
                        if (fileName != null) {
                            QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
                        }
                    } catch (IOException e) {
                        LOG.error("Error writing next dynamic config file to disk", e);
                    }
                }
            }
        }
    }

    public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
        synchronized (QV_LOCK) {
            if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
                // this is normal. For example - server found out about new config through FastLeaderElection gossiping
                // and then got the same config in UPTODATE message so its already known
                LOG.debug(
                    "{} setQuorumVerifier called with known or old config {}. Current version: {}",
                    getId(),
                    qv.getVersion(),
                    quorumVerifier.getVersion());
                return quorumVerifier;
            }
            QuorumVerifier prevQV = quorumVerifier;
            quorumVerifier = qv;
            if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) {
                lastSeenQuorumVerifier = qv;
            }

            if (writeToDisk) {
                // some tests initialize QuorumPeer without a static config file
                if (configFilename != null) {
                    try {
                        String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
                        QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
                        QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
                    } catch (IOException e) {
                        LOG.error("Error closing file", e);
                    }
                } else {
                    LOG.info("writeToDisk == true but configFilename == null");
                }
            }

            if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
                QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
            }
            QuorumServer qs = qv.getAllMembers().get(getId());
            if (qs != null) {
                setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
            }
            updateObserverMasterList();
            return prevQV;
        }
    }

    private String makeDynamicConfigFilename(long version) {
        return configFilename + ".dynamic." + Long.toHexString(version);
    }

    private boolean needEraseClientInfoFromStaticConfig() {
        QuorumServer server = quorumVerifier.getAllMembers().get(getId());
        return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
    }

    /**
     * Get an instance of LeaderElection
     */
    public Election getElectionAlg() {
        return electionAlg;
    }

    /**
     * Get the synclimit
     */
    public int getSyncLimit() {
        return syncLimit;
    }

    /**
     * Set the synclimit
     */
    public void setSyncLimit(int syncLimit) {
        LOG.info("syncLimit set to {}", syncLimit);
        this.syncLimit = syncLimit;
    }

    /**
     * Get the connectToLearnerMasterLimit
     */
    public int getConnectToLearnerMasterLimit() {
        return connectToLearnerMasterLimit;
    }

    /**
     * Set the connectToLearnerMasterLimit
     */
    public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
        LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit);
        this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
    }

    /**
     * The syncEnabled can also be set via a system property.
     */
    public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";

    /**
     * Return syncEnabled.
     */
    public boolean getSyncEnabled() {
        if (System.getProperty(SYNC_ENABLED) != null) {
            LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED));
            return Boolean.getBoolean(SYNC_ENABLED);
        } else {
            return syncEnabled;
        }
    }

    /**
     * Set syncEnabled.
     *
     * @param syncEnabled
     */
    public void setSyncEnabled(boolean syncEnabled) {
        this.syncEnabled = syncEnabled;
    }

    /**
     * Gets the election type
     */
    public int getElectionType() {
        return electionType;
    }

    /**
     * Sets the election type
     */
    public void setElectionType(int electionType) {
        this.electionType = electionType;
    }

    public boolean getQuorumListenOnAllIPs() {
        return quorumListenOnAllIPs;
    }

    public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) {
        this.quorumListenOnAllIPs = quorumListenOnAllIPs;
    }

    public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
        this.cnxnFactory = cnxnFactory;
    }

    public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) {
        this.secureCnxnFactory = secureCnxnFactory;
    }

    public void setSslQuorum(boolean sslQuorum) {
        if (sslQuorum) {
            LOG.info("Using TLS encrypted quorum communication");
        } else {
            LOG.info("Using insecure (non-TLS) quorum communication");
        }
        this.sslQuorum = sslQuorum;
    }

    public void setUsePortUnification(boolean shouldUsePortUnification) {
        LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled");
        this.shouldUsePortUnification = shouldUsePortUnification;
    }

    private void startServerCnxnFactory() {
        if (cnxnFactory != null) {
            cnxnFactory.start();
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.start();
        }
    }

    private void shutdownServerCnxnFactory() {
        if (cnxnFactory != null) {
            cnxnFactory.shutdown();
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.shutdown();
        }
    }

    // Leader and learner will control the zookeeper server and pass it into QuorumPeer.
    public void setZooKeeperServer(ZooKeeperServer zks) {
        if (cnxnFactory != null) {
            cnxnFactory.setZooKeeperServer(zks);
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.setZooKeeperServer(zks);
        }
    }

    public void closeAllConnections() {
        if (cnxnFactory != null) {
            cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
        }
    }

    public int getClientPort() {
        if (cnxnFactory != null) {
            return cnxnFactory.getLocalPort();
        }
        return -1;
    }

    public void setTxnFactory(FileTxnSnapLog factory) {
        this.logFactory = factory;
    }

    public FileTxnSnapLog getTxnFactory() {
        return this.logFactory;
    }

    /**
     * set zk database for this node
     * @param database
     */
    public void setZKDatabase(ZKDatabase database) {
        this.zkDb = database;
    }

    protected ZKDatabase getZkDb() {
        return zkDb;
    }

    public synchronized void initConfigInZKDatabase() {
        if (zkDb != null) {
            zkDb.initConfigInZKDatabase(getQuorumVerifier());
        }
    }

    public boolean isRunning() {
        return running;
    }

    /**
     * get reference to QuorumCnxManager
     */
    public QuorumCnxManager getQuorumCnxManager() {
        return qcmRef.get();
    }
    private long readLongFromFile(String name) throws IOException {
        File file = new File(logFactory.getSnapDir(), name);
        BufferedReader br = new BufferedReader(new FileReader(file));
        String line = "";
        try {
            line = br.readLine();
            return Long.parseLong(line);
        } catch (NumberFormatException e) {
            throw new IOException("Found " + line + " in " + file);
        } finally {
            br.close();
        }
    }

    private long acceptedEpoch = -1;
    private long currentEpoch = -1;

    public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";

    public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";

    /**
     * Write a long value to disk atomically. Either succeeds or an exception
     * is thrown.
     * @param name file name to write the long to
     * @param value the long value to write to the named file
     * @throws IOException if the file cannot be written atomically
     */
    // visibleForTest
     void writeLongToFile(String name, final long value) throws IOException {
        File file = new File(logFactory.getSnapDir(), name);
        new AtomicFileWritingIdiom(file, new WriterStatement() {
            @Override
            public void write(Writer bw) throws IOException {
                bw.write(Long.toString(value));
            }
        });
    }

    public long getCurrentEpoch() throws IOException {
        if (currentEpoch == -1) {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        }
        return currentEpoch;
    }

    public long getAcceptedEpoch() throws IOException {
        if (acceptedEpoch == -1) {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        }
        return acceptedEpoch;
    }

    public void setCurrentEpoch(long e) throws IOException {
        writeLongToFile(CURRENT_EPOCH_FILENAME, e);
        currentEpoch = e;

    }

    public void setAcceptedEpoch(long e) throws IOException {
        writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
        acceptedEpoch = e;
    }

    public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
        if (!isReconfigEnabled()) {
            LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
            return false;
        }

        InetSocketAddress oldClientAddr = getClientAddress();

        // update last committed quorum verifier, write the new config to disk
        // and restart leader election if config changed.
        QuorumVerifier prevQV = setQuorumVerifier(qv, true);

        // There is no log record for the initial config, thus after syncing
        // with leader
        // /zookeeper/config is empty! it is also possible that last committed
        // config is propagated during leader election
        // without the propagation the corresponding log records.
        // so we should explicitly do this (this is not necessary when we're
        // already a Follower/Observer, only
        // for Learner):
        initConfigInZKDatabase();

        if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) {
            Map<Long, QuorumServer> newMembers = qv.getAllMembers();
            updateRemotePeerMXBeans(newMembers);
            if (restartLE) {
                restartLeaderElection(prevQV, qv);
            }

            QuorumServer myNewQS = newMembers.get(getId());
            if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
                cnxnFactory.reconfigure(myNewQS.clientAddr);
                updateThreadName();
            }

            boolean roleChange = updateLearnerType(qv);
            boolean leaderChange = false;
            if (suggestedLeaderId != null) {
                // zxid should be non-null too
                leaderChange = updateVote(suggestedLeaderId, zxid);
            } else {
                long currentLeaderId = getCurrentVote().getId();
                QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId);
                QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId);
                leaderChange = (myleaderInCurQV == null
                                || myleaderInCurQV.addr == null
                                || myleaderInNewQV == null
                                || !myleaderInCurQV.addr.equals(myleaderInNewQV.addr));
                // we don't have a designated leader - need to go into leader
                // election
                reconfigFlagClear();
            }

            return roleChange || leaderChange;
        }
        return false;

    }

    private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) {
        Set<Long> existingMembers = new HashSet<Long>(newMembers.keySet());
        existingMembers.retainAll(jmxRemotePeerBean.keySet());
        for (Long id : existingMembers) {
            RemotePeerBean rBean = jmxRemotePeerBean.get(id);
            rBean.setQuorumServer(newMembers.get(id));
        }

        Set<Long> joiningMembers = new HashSet<Long>(newMembers.keySet());
        joiningMembers.removeAll(jmxRemotePeerBean.keySet());
        joiningMembers.remove(getId()); // remove self as it is local bean
        for (Long id : joiningMembers) {
            QuorumServer qs = newMembers.get(id);
            RemotePeerBean rBean = new RemotePeerBean(this, qs);
            try {
                MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                jmxRemotePeerBean.put(qs.id, rBean);
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
            }
        }

        Set<Long> leavingMembers = new HashSet<Long>(jmxRemotePeerBean.keySet());
        leavingMembers.removeAll(newMembers.keySet());
        for (Long id : leavingMembers) {
            RemotePeerBean rBean = jmxRemotePeerBean.remove(id);
            try {
                MBeanRegistry.getInstance().unregister(rBean);
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
        }
    }

    private ArrayList<QuorumServer> observerMasters = new ArrayList<>();
    private void updateObserverMasterList() {
        if (observerMasterPort <= 0) {
            return; // observer masters not enabled
        }
        observerMasters.clear();
        StringBuilder sb = new StringBuilder();
        for (QuorumServer server : quorumVerifier.getVotingMembers().values()) {
            InetAddress address = server.addr.getReachableOrOne().getAddress();
            InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort);
            observerMasters.add(new QuorumServer(server.id, addr));
            sb.append(addr).append(",");
        }
        LOG.info("Updated learner master list to be {}", sb.toString());
        Collections.shuffle(observerMasters);
        // Reset the internal index of the observerMaster when
        // the observerMaster List is refreshed
        nextObserverMaster = 0;
    }

    private boolean useObserverMasters() {
        return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0;
    }

    private int nextObserverMaster = 0;
    private QuorumServer nextObserverMaster() {
        if (nextObserverMaster >= observerMasters.size()) {
            nextObserverMaster = 0;
            // Add a reconnect delay only after the observer
            // has exhausted trying to connect to all the masters
            // from the observerMasterList
            if (isRunning()) {
                Observer.waitForReconnectDelay();
            }
        }
        return observerMasters.get(nextObserverMaster++);
    }

    QuorumServer findLearnerMaster(QuorumServer leader) {
        if (useObserverMasters()) {
            return nextObserverMaster();
        } else {
            // Add delay jitter to reduce the load on the leader
            if (isRunning()) {
                Observer.waitForReconnectDelay();
            }
            return leader;
        }
    }

    /**
     * Vet a given learner master's information.
     * Allows specification by server id, ip  only, or ip and port
     */
    QuorumServer validateLearnerMaster(String desiredMaster) {
        if (useObserverMasters()) {
            Long sid;
            try {
                sid = Long.parseLong(desiredMaster);
            } catch (NumberFormatException e) {
                sid = null;
            }
            for (QuorumServer server : observerMasters) {
                if (sid == null) {
                    for (InetSocketAddress address : server.addr.getAllAddresses()) {
                        String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort();
                        if (serverAddr.startsWith(desiredMaster)) {
                            return server;
                        }
                    }
                } else {
                    if (sid.equals(server.id)) {
                        return server;
                    }
                }
            }
            if (sid == null) {
                LOG.info("could not find learner master address={}", desiredMaster);
            } else {
                LOG.warn("could not find learner master sid={}", sid);
            }
        } else {
            LOG.info("cannot validate request, observer masters not enabled");
        }
        return null;
    }

    private boolean updateLearnerType(QuorumVerifier newQV) {
        //check if I'm an observer in new config
        if (newQV.getObservingMembers().containsKey(getId())) {
            if (getLearnerType() != LearnerType.OBSERVER) {
                setLearnerType(LearnerType.OBSERVER);
                LOG.info("Becoming an observer");
                reconfigFlagSet();
                return true;
            } else {
                return false;
            }
        } else if (newQV.getVotingMembers().containsKey(getId())) {
            if (getLearnerType() != LearnerType.PARTICIPANT) {
                setLearnerType(LearnerType.PARTICIPANT);
                LOG.info("Becoming a voting participant");
                reconfigFlagSet();
                return true;
            } else {
                return false;
            }
        }
        // I'm not in the view
        if (getLearnerType() != LearnerType.PARTICIPANT) {
            setLearnerType(LearnerType.PARTICIPANT);
            LOG.info("Becoming a non-voting participant");
            reconfigFlagSet();
            return true;
        }
        return false;
    }

    private boolean updateVote(long designatedLeader, long zxid) {
        Vote currentVote = getCurrentVote();
        if (currentVote != null && designatedLeader != currentVote.getId()) {
            setCurrentVote(new Vote(designatedLeader, zxid));
            reconfigFlagSet();
            LOG.warn("Suggested leader: {}", designatedLeader);
            return true;
        }
        return false;
    }

    /**
     * Updates leader election info to avoid inconsistencies when
     * a new server tries to join the ensemble.
     *
     * Here is the inconsistency scenario we try to solve by updating the peer
     * epoch after following leader:
     *
     * Let's say we have an ensemble with 3 servers z1, z2 and z3.
     *
     * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is
     *    0xb9, aka current accepted epoch on disk.
     * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading
     *    the current accept epoch from disk.
     * 3. z2 received notification from z1 and z3, which is following z3 with
     *    epoch 0xb8, so it started following z3 again with peer epoch 0xb8.
     * 4. before z2 successfully connected to z3, z3 get restarted with new
     *    epoch 0xb9.
     * 5. z2 will retry around a few round (default 5s) before giving up,
     *    meanwhile it will report z3 as leader.
     * 6. z1 restarted, and looking with peer epoch 0xb9.
     * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9.
     * 8. z2 successfully connected to z3 before giving up, but with peer
     *    epoch 0xb8.
     * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot
     *    join, because z2 is reporting peer epoch 0xb8, while z3 is reporting
     *    0xb9.
     *
     * By updating the election vote after actually following leader, we can
     * avoid this kind of stuck happened.
     *
     * Btw, the zxid and electionEpoch could be inconsistent because of the same
     * reason, it's better to update these as well after syncing with leader, but
     * that required protocol change which is non trivial. This problem is worked
     * around by skipping comparing the zxid and electionEpoch when counting for
     * votes for out of election servers during looking for leader.
     *
     * See https://issues.apache.org/jira/browse/ZOOKEEPER-1732
     */
    protected void updateElectionVote(long newEpoch) {
        Vote currentVote = getCurrentVote();
        if (currentVote != null) {
            setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote
                .getState()));
        }
    }

    private void updateThreadName() {
        String plain = cnxnFactory != null
                ? cnxnFactory.getLocalAddress() != null
                    ? formatInetAddr(cnxnFactory.getLocalAddress())
                    : "disabled"
                : "disabled";
        String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled";
        setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getId(), plain, secure));
    }

    /**
     * Sets the time taken for leader election in milliseconds.
     *
     * @param electionTimeTaken time taken for leader election
     */
    void setElectionTimeTaken(long electionTimeTaken) {
        this.electionTimeTaken = electionTimeTaken;
    }

    /**
     * @return the time taken for leader election in milliseconds.
     */
    long getElectionTimeTaken() {
        return electionTimeTaken;
    }

    void setQuorumServerSaslRequired(boolean serverSaslRequired) {
        quorumServerSaslAuthRequired = serverSaslRequired;
        LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired);
    }

    void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
        quorumLearnerSaslAuthRequired = learnerSaslRequired;
        LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired);
    }

    void setQuorumSaslEnabled(boolean enableAuth) {
        quorumSaslEnableAuth = enableAuth;
        if (!quorumSaslEnableAuth) {
            LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)");
        } else {
            LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
        }
    }

    void setQuorumServicePrincipal(String servicePrincipal) {
        quorumServicePrincipal = servicePrincipal;
        LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal);
    }

    void setQuorumLearnerLoginContext(String learnerContext) {
        quorumLearnerLoginContext = learnerContext;
        LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext);
    }

    void setQuorumServerLoginContext(String serverContext) {
        quorumServerLoginContext = serverContext;
        LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext);
    }

    void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
        if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
            quorumCnxnThreadsSize = qCnxnThreadsSize;
        }
        LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
    }

    boolean isQuorumSaslAuthEnabled() {
        return quorumSaslEnableAuth;
    }

    private boolean isQuorumServerSaslAuthRequired() {
        return quorumServerSaslAuthRequired;
    }

    private boolean isQuorumLearnerSaslAuthRequired() {
        return quorumLearnerSaslAuthRequired;
    }

    public QuorumCnxManager createCnxnManager() {
        int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
        LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout);
        return new QuorumCnxManager(
            this,
            this.getId(),
            this.getView(),
            this.authServer,
            this.authLearner,
            timeout,
            this.getQuorumListenOnAllIPs(),
            this.quorumCnxnThreadsSize,
            this.isQuorumSaslAuthEnabled());
    }

    boolean isLeader(long id) {
        Vote vote = getCurrentVote();
        return vote != null && id == vote.getId();
    }

    public boolean isReconfigEnabled() {
        return reconfigEnabled;
    }

    @InterfaceAudience.Private
    /**
     * This is a metric that depends on the status of the peer.
     */ public Integer getSynced_observers_metric() {
        if (leader != null) {
            return leader.getObservingLearners().size();
        } else if (follower != null) {
            return follower.getSyncedObserverSize();
        } else {
            return null;
        }
    }

    /**
     * Create a new QuorumPeer and apply all the values per the already-parsed config.
     *
     * @param config The appertained quorum peer config.
     * @return A QuorumPeer instantiated with specified peer config. Note this peer
     *         is not fully initialized; caller should finish initialization through
     *         additional configurations (connection factory settings, etc).
     *
     * @throws IOException
     */
    public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException {
        QuorumPeer quorumPeer = new QuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
        quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier() != null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        quorumPeer.setSslQuorum(config.isSslQuorum());
        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        if (config.sslQuorumReloadCertFiles) {
            quorumPeer.getX509Util().enableCertFileReloading();
        }
        quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
        quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
        quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

        // sets quorum sasl authentication configurations
        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
        if (quorumPeer.isQuorumSaslAuthEnabled()) {
            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
        }
        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);

        if (config.jvmPauseMonitorToRun) {
            quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
        }

        return quorumPeer;
    }

}

org/apache/zookeeper/server/quorum/QuorumPeer.java

 

Or download all of them as a single archive file:

File name: zookeeper-server-3.8.0-fyi.zip
File size: 885581 bytes
Release date: 2022-02-25
Download 

 

Apache ZooKeeper Jute Source Code

Download and Install Apache ZooKeeper Source Package

Downloading and Reviewing zookeeper.jar

⇑⇑ FAQ for Apache ZooKeeper

2022-11-16, 13112👍, 0💬