Apache ZooKeeper Server Source Code

Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.

Apache ZooKeeper Server Source Code files are provided in the source package file, apache-zookeeper-3.8.0.tar.gz.

You can download apache-zookeeper-3.8.0.tar.gz as described in the previous tutorial and go to the "zookeeper-server" sub-folder to view Apache ZooKeeper Server Source Code files.

You can also browse Apache ZooKeeper Server Source Code below:

✍: FYIcenter.com

org/apache/zookeeper/server/ZooKeeperServer.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import javax.security.sasl.SaslException;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.StatsTrack;
import org.apache.zookeeper.Version;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZookeeperBanner;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.AuthPacket;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.GetSASLRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.proto.SetSASLResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.OSMXBean;
import org.apache.zookeeper.server.util.QuotaMetricsUtils;
import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class implements a simple standalone ZooKeeperServer. It sets up the
 * following chain of RequestProcessors to process requests:
 * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
 */
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {

    protected static final Logger LOG;
    private static final RateLogger RATE_LOGGER;

    public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";

    public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
    public static final String SKIP_ACL = "zookeeper.skipACL";
    public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";

    // When enabled, will check ACL constraints appertained to the requests first,
    // before sending the requests to the quorum.
    static final boolean enableEagerACLCheck;

    static final boolean skipACL;

    public static final boolean enforceQuota;

    public static final String SASL_SUPER_USER = "zookeeper.superUser";

    public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
    public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
    private static boolean digestEnabled;

    // Add a enable/disable option for now, we should remove this one when
    // this feature is confirmed to be stable
    public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
    private static boolean closeSessionTxnEnabled = true;

    static {
        LOG = LoggerFactory.getLogger(ZooKeeperServer.class);

        RATE_LOGGER = new RateLogger(LOG);

        ZookeeperBanner.printBanner(LOG);

        Environment.logEnv("Server environment:", LOG);

        enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
        LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck);

        skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
        if (skipACL) {
            LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
        }

        enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
        if (enforceQuota) {
            LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
        }

        digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
        LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);

        closeSessionTxnEnabled = Boolean.parseBoolean(
                System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
        LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
    }

    public static boolean isCloseSessionTxnEnabled() {
        return closeSessionTxnEnabled;
    }

    public static void setCloseSessionTxnEnabled(boolean enabled) {
        ZooKeeperServer.closeSessionTxnEnabled = enabled;
        LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
                ZooKeeperServer.closeSessionTxnEnabled);
    }

    protected ZooKeeperServerBean jmxServerBean;
    protected DataTreeBean jmxDataTreeBean;

    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
    protected static volatile int throttledOpWaitTime =
        Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
    /** value of -1 indicates unset, use default */
    protected int minSessionTimeout = -1;
    /** value of -1 indicates unset, use default */
    protected int maxSessionTimeout = -1;
    /** Socket listen backlog. Value of -1 indicates unset */
    protected int listenBacklog = -1;
    protected SessionTracker sessionTracker;
    private FileTxnSnapLog txnLogFactory = null;
    private ZKDatabase zkDb;
    private ResponseCache readResponseCache;
    private ResponseCache getChildrenResponseCache;
    private final AtomicLong hzxid = new AtomicLong(0);
    public static final Exception ok = new Exception("No prob");
    protected RequestProcessor firstProcessor;
    protected JvmPauseMonitor jvmPauseMonitor;
    protected volatile State state = State.INITIAL;
    private boolean isResponseCachingEnabled = true;
    /* contains the configuration file content read at startup */
    protected String initialConfig;
    protected boolean reconfigEnabled;
    private final RequestPathMetricsCollector requestPathMetricsCollector;
    private static final int DEFAULT_SNAP_COUNT = 100000;
    private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000;

    private boolean localSessionEnabled = false;
    protected enum State {
        INITIAL,
        RUNNING,
        SHUTDOWN,
        ERROR
    }

    /**
     * This is the secret that we use to generate passwords. For the moment,
     * it's more of a checksum that's used in reconnection, which carries no
     * security weight, and is treated internally as if it carries no
     * security weight.
     */
    private static final long superSecret = 0XB3415C00L;

    private final AtomicInteger requestsInProcess = new AtomicInteger(0);
    final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
    // this data structure must be accessed under the outstandingChanges lock
    final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();

    protected ServerCnxnFactory serverCnxnFactory;
    protected ServerCnxnFactory secureServerCnxnFactory;

    private final ServerStats serverStats;
    private final ZooKeeperServerListener listener;
    private ZooKeeperServerShutdownHandler zkShutdownHandler;
    private volatile int createSessionTrackerServerId = 1;

    private static final String FLUSH_DELAY = "zookeeper.flushDelay";
    private static volatile long flushDelay;
    private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
    private static volatile long maxWriteQueuePollTime;
    private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
    private static volatile int maxBatchSize;

    /**
     * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
     * Flag not used for small transfers like connectResponses.
     */
    public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
    public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
    public static final int intBufferStartingSizeBytes;

    public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
    public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";

    static {
        long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
        setFlushDelay(configuredFlushDelay);
        setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
        setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));

        intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE);

        if (intBufferStartingSizeBytes < 32) {
            String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. "
                         + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
            LOG.error(msg);
            throw new IllegalArgumentException(msg);
        }

        LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes);
    }

    // Connection throttling
    private BlueThrottle connThrottle = new BlueThrottle();

    @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification =
        "Internally the throttler has a BlockingQueue so "
        + "once the throttler is created and started, it is thread-safe")
    private RequestThrottler requestThrottler;
    public static final String SNAP_COUNT = "zookeeper.snapCount";

    /**
     * This setting sets a limit on the total number of large requests that
     * can be inflight and is designed to prevent ZooKeeper from accepting
     * too many large requests such that the JVM runs out of usable heap and
     * ultimately crashes.
     *
     * The limit is enforced by the {@link checkRequestSize(int, boolean)}
     * method which is called by the connection layer ({@link NIOServerCnxn},
     * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
     * data off the TCP socket. The limit is then checked again by the
     * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
     * also atomically updates {@link currentLargeRequestBytes}. The request is
     * then marked as a large request, with the request size stored in the Request
     * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
     *
     * When a request is completed or dropped, the relevant code path calls the
     * {@link requestFinished(Request)} method which performs the decrement if
     * needed.
     */
    private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;

    /**
     * The size threshold after which a request is considered a large request
     * and is checked against the large request byte limit.
     */
    private volatile int largeRequestThreshold = -1;

    private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);

    private AuthenticationHelper authHelper;

    void removeCnxn(ServerCnxn cnxn) {
        zkDb.removeCnxn(cnxn);
    }

    /**
     * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
     * methods to prepare the instance (eg datadir, datalogdir, ticktime,
     * builder, etc...)
     *
     */
    public ZooKeeperServer() {
        listener = new ZooKeeperServerListenerImpl(this);
        serverStats = new ServerStats(this);
        this.requestPathMetricsCollector = new RequestPathMetricsCollector();
        this.authHelper = new AuthenticationHelper();
    }

    /**
     * Keeping this constructor for backward compatibility
     */
    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
        this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
    }

    /**
     *  * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
     * actually start listening for clients until run() is invoked.
     *
     */
    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.txnLogFactory.setServerStats(this.serverStats);
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        setMinSessionTimeout(minSessionTimeout);
        setMaxSessionTimeout(maxSessionTimeout);
        this.listenBacklog = clientPortListenBacklog;
        this.reconfigEnabled = reconfigEnabled;

        listener = new ZooKeeperServerListenerImpl(this);

        readResponseCache = new ResponseCache(Integer.getInteger(
            GET_DATA_RESPONSE_CACHE_SIZE,
            ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");

        getChildrenResponseCache = new ResponseCache(Integer.getInteger(
            GET_CHILDREN_RESPONSE_CACHE_SIZE,
            ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");

        this.initialConfig = initialConfig;

        this.requestPathMetricsCollector = new RequestPathMetricsCollector();

        this.initLargeRequestThrottlingSettings();

        this.authHelper = new AuthenticationHelper();

        LOG.info(
            "Created server with"
                + " tickTime {} ms"
                + " minSessionTimeout {} ms"
                + " maxSessionTimeout {} ms"
                + " clientPortListenBacklog {}"
                + " datadir {}"
                + " snapdir {}",
            tickTime,
            getMinSessionTimeout(),
            getMaxSessionTimeout(),
            getClientPortListenBacklog(),
            txnLogFactory.getDataDir(),
            txnLogFactory.getSnapDir());
    }

    public String getInitialConfig() {
        return initialConfig;
    }

    /**
     * Adds JvmPauseMonitor and calls
     * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)}
     *
     */
    public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
        this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
        this.jvmPauseMonitor = jvmPauseMonitor;
        if (jvmPauseMonitor != null) {
            LOG.info("Added JvmPauseMonitor to server");
        }
    }

    /**
     * creates a zookeeperserver instance.
     * @param txnLogFactory the file transaction snapshot logging class
     * @param tickTime the ticktime for the server
     * @throws IOException
     */
    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
        this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
    }

    public ServerStats serverStats() {
        return serverStats;
    }

    public RequestPathMetricsCollector getRequestPathMetricsCollector() {
        return requestPathMetricsCollector;
    }

    public BlueThrottle connThrottle() {
        return connThrottle;
    }

    public void dumpConf(PrintWriter pwriter) {
        pwriter.print("clientPort=");
        pwriter.println(getClientPort());
        pwriter.print("secureClientPort=");
        pwriter.println(getSecureClientPort());
        pwriter.print("dataDir=");
        pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
        pwriter.print("dataDirSize=");
        pwriter.println(getDataDirSize());
        pwriter.print("dataLogDir=");
        pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
        pwriter.print("dataLogSize=");
        pwriter.println(getLogDirSize());
        pwriter.print("tickTime=");
        pwriter.println(getTickTime());
        pwriter.print("maxClientCnxns=");
        pwriter.println(getMaxClientCnxnsPerHost());
        pwriter.print("minSessionTimeout=");
        pwriter.println(getMinSessionTimeout());
        pwriter.print("maxSessionTimeout=");
        pwriter.println(getMaxSessionTimeout());
        pwriter.print("clientPortListenBacklog=");
        pwriter.println(getClientPortListenBacklog());

        pwriter.print("serverId=");
        pwriter.println(getServerId());
    }

    public ZooKeeperServerConf getConf() {
        return new ZooKeeperServerConf(
            getClientPort(),
            zkDb.snapLog.getSnapDir().getAbsolutePath(),
            zkDb.snapLog.getDataDir().getAbsolutePath(),
            getTickTime(),
            getMaxClientCnxnsPerHost(),
            getMinSessionTimeout(),
            getMaxSessionTimeout(),
            getServerId(),
            getClientPortListenBacklog());
    }

    /**
     * This constructor is for backward compatibility with the existing unit
     * test code.
     * It defaults to FileLogProvider persistence provider.
     */
    public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
        this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
    }

    /**
     * Default constructor, relies on the config for its argument values
     *
     * @throws IOException
     */
    public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
        this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
    }

    /**
     * get the zookeeper database for this server
     * @return the zookeeper database for this server
     */
    public ZKDatabase getZKDatabase() {
        return this.zkDb;
    }

    /**
     * set the zkdatabase for this zookeeper server
     * @param zkDb
     */
    public void setZKDatabase(ZKDatabase zkDb) {
        this.zkDb = zkDb;
    }

    /**
     *  Restore sessions and data
     */
    public void loadData() throws IOException, InterruptedException {
        /*
         * When a new leader starts executing Leader#lead, it
         * invokes this method. The database, however, has been
         * initialized before running leader election so that
         * the server could pick its zxid for its initial vote.
         * It does it by invoking QuorumPeer#getLastLoggedZxid.
         * Consequently, we don't need to initialize it once more
         * and avoid the penalty of loading it a second time. Not
         * reloading it is particularly important for applications
         * that host a large database.
         *
         * The following if block checks whether the database has
         * been initialized or not. Note that this method is
         * invoked by at least one other method:
         * ZooKeeperServer#startdata.
         *
         * See ZOOKEEPER-1642 for more detail.
         */
        if (zkDb.isInitialized()) {
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        } else {
            setZxid(zkDb.loadDataBase());
        }

        // Clean up dead sessions
        zkDb.getSessions().stream()
                        .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
                        .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));

        // Make a clean snapshot
        takeSnapshot();
    }

    public void takeSnapshot() {
        takeSnapshot(false);
    }

    public void takeSnapshot(boolean syncSnap) {
        long start = Time.currentElapsedTime();
        try {
            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
        } catch (IOException e) {
            LOG.error("Severe unrecoverable error, exiting", e);
            // This is a severe error that we cannot recover from,
            // so we need to exit
            ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
        }
        long elapsed = Time.currentElapsedTime() - start;
        LOG.info("Snapshot taken in {} ms", elapsed);
        ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
    }

    public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
        return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
    }

    @Override
    public long getDataDirSize() {
        if (zkDb == null) {
            return 0L;
        }
        File path = zkDb.snapLog.getDataDir();
        return getDirSize(path);
    }

    @Override
    public long getLogDirSize() {
        if (zkDb == null) {
            return 0L;
        }
        File path = zkDb.snapLog.getSnapDir();
        return getDirSize(path);
    }

    private long getDirSize(File file) {
        long size = 0L;
        if (file.isDirectory()) {
            File[] files = file.listFiles();
            if (files != null) {
                for (File f : files) {
                    size += getDirSize(f);
                }
            }
        } else {
            size = file.length();
        }
        return size;
    }

    public long getZxid() {
        return hzxid.get();
    }

    public SessionTracker getSessionTracker() {
        return sessionTracker;
    }

    long getNextZxid() {
        return hzxid.incrementAndGet();
    }

    public void setZxid(long zxid) {
        hzxid.set(zxid);
    }

    private void close(long sessionId) {
        Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
        submitRequest(si);
    }

    public void closeSession(long sessionId) {
        LOG.info("Closing session 0x{}", Long.toHexString(sessionId));

        // we do not want to wait for a session close. send it as soon as we
        // detect it!
        close(sessionId);
    }

    protected void killSession(long sessionId, long zxid) {
        zkDb.killSession(sessionId, zxid);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
        }
        if (sessionTracker != null) {
            sessionTracker.removeSession(sessionId);
        }
    }

    public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info(
            "Expiring session 0x{}, timeout of {}ms exceeded",
            Long.toHexString(sessionId),
            session.getTimeout());
        close(sessionId);
    }

    public void expire(long sessionId) {
        LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId));

        close(sessionId);
    }

    public static class MissingSessionException extends IOException {

        private static final long serialVersionUID = 7467414635467261007L;

        public MissingSessionException(String msg) {
            super(msg);
        }

    }

    void touch(ServerCnxn cnxn) throws MissingSessionException {
        if (cnxn == null) {
            return;
        }
        long id = cnxn.getSessionId();
        int to = cnxn.getSessionTimeout();
        if (!sessionTracker.touchSession(id, to)) {
            throw new MissingSessionException("No session with sessionid 0x"
                                              + Long.toHexString(id)
                                              + " exists, probably expired and removed");
        }
    }

    protected void registerJMX() {
        // register with JMX
        try {
            jmxServerBean = new ZooKeeperServerBean(this);
            MBeanRegistry.getInstance().register(jmxServerBean, null);

            try {
                jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
                MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
                jmxDataTreeBean = null;
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxServerBean = null;
        }
    }

    public void startdata() throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }

    public synchronized void startup() {
        startupWithServerState(State.RUNNING);
    }

    public synchronized void startupWithoutServing() {
        startupWithServerState(State.INITIAL);
    }

    public synchronized void startServing() {
        setState(State.RUNNING);
        notifyAll();
    }

    private void startupWithServerState(State state) {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        startRequestThrottler();

        registerJMX();

        startJvmPauseMonitor();

        registerMetrics();

        setState(state);

        requestPathMetricsCollector.start();

        localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

        notifyAll();
    }

    protected void startJvmPauseMonitor() {
        if (this.jvmPauseMonitor != null) {
            this.jvmPauseMonitor.serviceStart();
        }
    }

    protected void startRequestThrottler() {
        requestThrottler = new RequestThrottler(this);
        requestThrottler.start();

    }

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor) firstProcessor).start();
    }

    public ZooKeeperServerListener getZooKeeperServerListener() {
        return listener;
    }

    /**
     * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to
     * {@link #startup()} being called
     *
     * @param newId ID to use
     */
    public void setCreateSessionTrackerServerId(int newId) {
        createSessionTrackerServerId = newId;
    }

    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
    }

    protected void startSessionTracker() {
        ((SessionTrackerImpl) sessionTracker).start();
    }

    /**
     * Sets the state of ZooKeeper server. After changing the state, it notifies
     * the server state change to a registered shutdown handler, if any.
     * <p>
     * The following are the server state transitions:
     * <ul><li>During startup the server will be in the INITIAL state.</li>
     * <li>After successfully starting, the server sets the state to RUNNING.
     * </li>
     * <li>The server transitions to the ERROR state if it hits an internal
     * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource
     * error events, e.g., SyncRequestProcessor not being able to write a txn to
     * disk.</li>
     * <li>During shutdown the server sets the state to SHUTDOWN, which
     * corresponds to the server not running.</li></ul>
     *
     * @param state new server state.
     */
    protected void setState(State state) {
        this.state = state;
        // Notify server state changes to the registered shutdown handler, if any.
        if (zkShutdownHandler != null) {
            zkShutdownHandler.handle(state);
        } else {
            LOG.debug(
                "ZKShutdownHandler is not registered, so ZooKeeper server"
                    + " won't take any action on ERROR or SHUTDOWN server state changes");
        }
    }

    /**
     * This can be used while shutting down the server to see whether the server
     * is already shutdown or not.
     *
     * @return true if the server is running or server hits an error, false
     *         otherwise.
     */
    protected boolean canShutdown() {
        return state == State.RUNNING || state == State.ERROR;
    }

    /**
     * @return true if the server is running, false otherwise.
     */
    public boolean isRunning() {
        return state == State.RUNNING;
    }

    public void shutdown() {
        shutdown(false);
    }

    /**
     * Shut down the server instance
     * @param fullyShutDown true if another server using the same database will not replace this one in the same process
     */
    public synchronized void shutdown(boolean fullyShutDown) {
        if (!canShutdown()) {
            if (fullyShutDown && zkDb != null) {
                zkDb.clear();
            }
            LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
            return;
        }
        LOG.info("shutting down");

        // new RuntimeException("Calling shutdown").printStackTrace();
        setState(State.SHUTDOWN);

        // unregister all metrics that are keeping a strong reference to this object
        // subclasses will do their specific clean up
        unregisterMetrics();

        if (requestThrottler != null) {
            requestThrottler.shutdown();
        }

        // Since sessionTracker and syncThreads poll we just have to
        // set running to false and they will detect it during the poll
        // interval.
        if (sessionTracker != null) {
            sessionTracker.shutdown();
        }
        if (firstProcessor != null) {
            firstProcessor.shutdown();
        }
        if (jvmPauseMonitor != null) {
            jvmPauseMonitor.serviceStop();
        }

        if (zkDb != null) {
            if (fullyShutDown) {
                zkDb.clear();
            } else {
                // else there is no need to clear the database
                //  * When a new quorum is established we can still apply the diff
                //    on top of the same zkDb data
                //  * If we fetch a new snapshot from leader, the zkDb will be
                //    cleared anyway before loading the snapshot
                try {
                    //This will fast forward the database to the latest recorded transactions
                    zkDb.fastForwardDataBase();
                } catch (IOException e) {
                    LOG.error("Error updating DB", e);
                    zkDb.clear();
                }
            }
        }

        requestPathMetricsCollector.shutdown();
        unregisterJMX();
    }

    protected void unregisterJMX() {
        // unregister from JMX
        try {
            if (jmxDataTreeBean != null) {
                MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        try {
            if (jmxServerBean != null) {
                MBeanRegistry.getInstance().unregister(jmxServerBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxServerBean = null;
        jmxDataTreeBean = null;
    }

    public void incInProcess() {
        requestsInProcess.incrementAndGet();
    }

    public void decInProcess() {
        requestsInProcess.decrementAndGet();
        if (requestThrottler != null) {
            requestThrottler.throttleWake();
        }
    }

    public int getInProcess() {
        return requestsInProcess.get();
    }

    public int getInflight() {
        return requestThrottleInflight();
    }

    private int requestThrottleInflight() {
        if (requestThrottler != null) {
            return requestThrottler.getInflight();
        }
        return 0;
    }

    static class PrecalculatedDigest {
        final long nodeDigest;
        final long treeDigest;

        PrecalculatedDigest(long nodeDigest, long treeDigest) {
            this.nodeDigest = nodeDigest;
            this.treeDigest = treeDigest;
        }
    }


    /**
     * This structure is used to facilitate information sharing between PrepRP
     * and FinalRP.
     */
    static class ChangeRecord {
        PrecalculatedDigest precalculatedDigest;
        byte[] data;

        ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
            this.zxid = zxid;
            this.path = path;
            this.stat = stat;
            this.childCount = childCount;
            this.acl = acl;
        }

        long zxid;

        String path;

        StatPersisted stat; /* Make sure to create a new object when changing */

        int childCount;

        List<ACL> acl; /* Make sure to create a new object when changing */

        ChangeRecord duplicate(long zxid) {
            StatPersisted stat = new StatPersisted();
            if (this.stat != null) {
                DataTree.copyStatPersisted(this.stat, stat);
            }
            ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
                    acl == null ? new ArrayList<>() : new ArrayList<>(acl));
            changeRecord.precalculatedDigest = precalculatedDigest;
            changeRecord.data = data;
            return changeRecord;
        }

    }

    byte[] generatePasswd(long id) {
        Random r = new Random(id ^ superSecret);
        byte[] p = new byte[16];
        r.nextBytes(p);
        return p;
    }

    protected boolean checkPasswd(long sessionId, byte[] passwd) {
        return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId));
    }

    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            // Possible since it's just deserialized from a packet on the wire.
            passwd = new byte[0];
        }
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
        submitRequest(si);
        return sessionId;
    }

    /**
     * set the owner of this session as owner
     * @param id the session id
     * @param owner the owner of the session
     * @throws SessionExpiredException
     */
    public void setOwner(long id, Object owner) throws SessionExpiredException {
        sessionTracker.setOwner(id, owner);
    }

    protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc);
        }
        finishSessionInit(cnxn, rc);
    }

    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
        if (checkPasswd(sessionId, passwd)) {
            revalidateSession(cnxn, sessionId, sessionTimeout);
        } else {
            LOG.warn(
                "Incorrect password from {} for session 0x{}",
                cnxn.getRemoteSocketAddress(),
                Long.toHexString(sessionId));
            finishSessionInit(cnxn, false);
        }
    }

    public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
        // register with JMX
        try {
            if (valid) {
                if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
                    serverCnxnFactory.registerConnection(cnxn);
                } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
                    secureServerCnxnFactory.registerConnection(cnxn);
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
        }

        try {
            ConnectResponse rsp = new ConnectResponse(
                0,
                valid ? cnxn.getSessionTimeout() : 0,
                valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                // longer valid
                valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            bos.writeInt(-1, "len");
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
            bb.putInt(bb.remaining() - 4).rewind();
            cnxn.sendBuffer(bb);

            if (valid) {
                LOG.debug(
                    "Established session 0x{} with negotiated timeout {} for client {}",
                    Long.toHexString(cnxn.getSessionId()),
                    cnxn.getSessionTimeout(),
                    cnxn.getRemoteSocketAddress());
                cnxn.enableRecv();
            } else {

                LOG.info(
                    "Invalid session 0x{} for client {}, probably expired",
                    Long.toHexString(cnxn.getSessionId()),
                    cnxn.getRemoteSocketAddress());
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            }

        } catch (Exception e) {
            LOG.warn("Exception while establishing session, closing", e);
            cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
        }
    }

    public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
        closeSession(cnxn.getSessionId());
    }

    public long getServerId() {
        return 0;
    }

    /**
     * If the underlying Zookeeper server support local session, this method
     * will set a isLocalSession to true if a request is associated with
     * a local session.
     *
     * @param si
     */
    protected void setLocalSessionFlag(Request si) {
    }

    public void submitRequest(Request si) {
        enqueueRequest(si);
    }

    public void enqueueRequest(Request si) {
        if (requestThrottler == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (requestThrottler == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        requestThrottler.submitRequest(si);
    }

    public void submitRequestNow(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type {}", si.type);
                // Update request accounting/throttling limits
                requestFinished(si);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            LOG.debug("Dropping request.", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        }
    }

    public static int getSnapCount() {
        int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT);
        // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor
        if (snapCount < 2) {
            LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2");
            snapCount = 2;
        }
        return snapCount;
    }

    public int getGlobalOutstandingLimit() {
        return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT);
    }

    public static long getSnapSizeInBytes() {
        long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
        if (size <= 0) {
            LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
        }
        return size * 1024; // Convert to bytes
    }

    public void setServerCnxnFactory(ServerCnxnFactory factory) {
        serverCnxnFactory = factory;
    }

    public ServerCnxnFactory getServerCnxnFactory() {
        return serverCnxnFactory;
    }

    public ServerCnxnFactory getSecureServerCnxnFactory() {
        return secureServerCnxnFactory;
    }

    public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
        secureServerCnxnFactory = factory;
    }

    /**
     * return the last processed id from the
     * datatree
     */
    public long getLastProcessedZxid() {
        return zkDb.getDataTreeLastProcessedZxid();
    }

    /**
     * return the outstanding requests
     * in the queue, which haven't been
     * processed yet
     */
    public long getOutstandingRequests() {
        return getInProcess();
    }

    /**
     * return the total number of client connections that are alive
     * to this server
     */
    public int getNumAliveConnections() {
        int numAliveConnections = 0;

        if (serverCnxnFactory != null) {
            numAliveConnections += serverCnxnFactory.getNumAliveConnections();
        }

        if (secureServerCnxnFactory != null) {
            numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
        }

        return numAliveConnections;
    }

    /**
     * truncate the log to get in sync with others
     * if in a quorum
     * @param zxid the zxid that it needs to get in sync
     * with others
     * @throws IOException
     */
    public void truncateLog(long zxid) throws IOException {
        this.zkDb.truncateLog(zxid);
    }

    public int getTickTime() {
        return tickTime;
    }

    public void setTickTime(int tickTime) {
        LOG.info("tickTime set to {} ms", tickTime);
        this.tickTime = tickTime;
    }

    public static int getThrottledOpWaitTime() {
        return throttledOpWaitTime;
    }

    public static void setThrottledOpWaitTime(int time) {
        LOG.info("throttledOpWaitTime set to {} ms", time);
        throttledOpWaitTime = time;
    }

    public int getMinSessionTimeout() {
        return minSessionTimeout;
    }

    public void setMinSessionTimeout(int min) {
        this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
        LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout);
    }

    public int getMaxSessionTimeout() {
        return maxSessionTimeout;
    }

    public void setMaxSessionTimeout(int max) {
        this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
        LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout);
    }

    public int getClientPortListenBacklog() {
        return listenBacklog;
    }

    public void setClientPortListenBacklog(int backlog) {
        this.listenBacklog = backlog;
        LOG.info("clientPortListenBacklog set to {}", backlog);
    }

    public int getClientPort() {
        return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
    }

    public int getSecureClientPort() {
        return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1;
    }

    /** Maximum number of connections allowed from particular host (ip) */
    public int getMaxClientCnxnsPerHost() {
        if (serverCnxnFactory != null) {
            return serverCnxnFactory.getMaxClientCnxnsPerHost();
        }
        if (secureServerCnxnFactory != null) {
            return secureServerCnxnFactory.getMaxClientCnxnsPerHost();
        }
        return -1;
    }

    public void setTxnLogFactory(FileTxnSnapLog txnLog) {
        this.txnLogFactory = txnLog;
    }

    public FileTxnSnapLog getTxnLogFactory() {
        return this.txnLogFactory;
    }

    /**
     * Returns the elapsed sync of time of transaction log in milliseconds.
     */
    public long getTxnLogElapsedSyncTime() {
        return txnLogFactory.getTxnLogElapsedSyncTime();
    }

    public String getState() {
        return "standalone";
    }

    public void dumpEphemerals(PrintWriter pwriter) {
        zkDb.dumpEphemerals(pwriter);
    }

    public Map<Long, Set<String>> getEphemerals() {
        return zkDb.getEphemerals();
    }

    public double getConnectionDropChance() {
        return connThrottle.getDropChance();
    }

    @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
        throws IOException, ClientCnxnLimitException {

        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect");
        LOG.debug(
            "Session establishment request from client {} client's lastZxid is 0x{}",
            cnxn.getRemoteSocketAddress(),
            Long.toHexString(connReq.getLastZxidSeen()));

        long sessionId = connReq.getSessionId();
        int tokensNeeded = 1;
        if (connThrottle.isConnectionWeightEnabled()) {
            if (sessionId == 0) {
                if (localSessionEnabled) {
                    tokensNeeded = connThrottle.getRequiredTokensForLocal();
                } else {
                    tokensNeeded = connThrottle.getRequiredTokensForGlobal();
                }
            } else {
                tokensNeeded = connThrottle.getRequiredTokensForRenew();
            }
        }

        if (!connThrottle.checkLimit(tokensNeeded)) {
            throw new ClientCnxnLimitException();
        }
        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());

        ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);

        boolean readOnly = false;
        try {
            readOnly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
            // this is ok -- just a packet from an old client which
            // doesn't contain readOnly field
            LOG.warn(
                "Connection request from old client {}; will be dropped if server is in r-o mode",
                cnxn.getRemoteSocketAddress());
        }
        if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
            String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
            LOG.info(msg);
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
        }
        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
            String msg = "Refusing session request for client "
                         + cnxn.getRemoteSocketAddress()
                         + " as it has seen zxid 0x"
                         + Long.toHexString(connReq.getLastZxidSeen())
                         + " our last zxid is 0x"
                         + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                         + " client must try another server";

            LOG.info(msg);
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
        }
        int sessionTimeout = connReq.getTimeOut();
        byte[] passwd = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
        cnxn.disableRecv();
        if (sessionId == 0) {
            long id = createSession(cnxn, passwd, sessionTimeout);
            LOG.debug(
                "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                Long.toHexString(id),
                Long.toHexString(connReq.getLastZxidSeen()),
                connReq.getTimeOut(),
                cnxn.getRemoteSocketAddress());
        } else {
            validateSession(cnxn, sessionId);
            LOG.debug(
                "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                Long.toHexString(sessionId),
                Long.toHexString(connReq.getLastZxidSeen()),
                connReq.getTimeOut(),
                cnxn.getRemoteSocketAddress());
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
            ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);

        }
    }

    /**
     * Validate if a particular session can be reestablished.
     *
     * @param cnxn
     * @param sessionId
     */
    protected void validateSession(ServerCnxn cnxn, long sessionId)
            throws IOException {
        // do nothing
    }

    public boolean shouldThrottle(long outStandingCount) {
        int globalOutstandingLimit = getGlobalOutstandingLimit();
        if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
            return outStandingCount > 0;
        }
        return false;
    }

    long getFlushDelay() {
        return flushDelay;
    }

    static void setFlushDelay(long delay) {
        LOG.info("{} = {} ms", FLUSH_DELAY, delay);
        flushDelay = delay;
    }

    long getMaxWriteQueuePollTime() {
        return maxWriteQueuePollTime;
    }

    static void setMaxWriteQueuePollTime(long maxTime) {
        LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
        maxWriteQueuePollTime = maxTime;
    }

    int getMaxBatchSize() {
        return maxBatchSize;
    }

    static void setMaxBatchSize(int size) {
        LOG.info("{}={}", MAX_BATCH_SIZE, size);
        maxBatchSize = size;
    }

    private void initLargeRequestThrottlingSettings() {
        setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
        setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
    }

    public int getLargeRequestMaxBytes() {
        return largeRequestMaxBytes;
    }

    public void setLargeRequestMaxBytes(int bytes) {
        if (bytes <= 0) {
            LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
            LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
        } else {
            largeRequestMaxBytes = bytes;
            LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
        }
    }

    public int getLargeRequestThreshold() {
        return largeRequestThreshold;
    }

    public void setLargeRequestThreshold(int threshold) {
        if (threshold == 0 || threshold < -1) {
            LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
            largeRequestThreshold = -1;
        } else {
            largeRequestThreshold = threshold;
            LOG.info("The large request threshold is set to {}", largeRequestThreshold);
        }
    }

    public int getLargeRequestBytes() {
        return currentLargeRequestBytes.get();
    }

    private boolean isLargeRequest(int length) {
        // The large request limit is disabled when threshold is -1
        if (largeRequestThreshold == -1) {
            return false;
        }
        return length > largeRequestThreshold;
    }

    public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
        if (!isLargeRequest(length)) {
            return true;
        }
        if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
            return true;
        } else {
            ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
            throw new IOException("Rejecting large request");
        }

    }

    private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
        if (!isLargeRequest(length)) {
            return true;
        }

        int bytes = currentLargeRequestBytes.addAndGet(length);
        if (bytes > largeRequestMaxBytes) {
            currentLargeRequestBytes.addAndGet(-length);
            ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
            throw new IOException("Rejecting large request");
        }
        return true;
    }

    public void requestFinished(Request request) {
        int largeRequestLength = request.getLargeRequestSize();
        if (largeRequestLength != -1) {
            currentLargeRequestBytes.addAndGet(-largeRequestLength);
        }
    }

    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");

        // Need to increase the outstanding request count first, otherwise
        // there might be a race condition that it enabled recv after
        // processing request and then disabled when check throttling.
        //
        // Be aware that we're actually checking the global outstanding
        // request before this request.
        //
        // It's fine if the IOException thrown before we decrease the count
        // in cnxn, since it will close the cnxn anyway.
        cnxn.incrOutstandingAndCheckThrottle(h);

        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) {
            LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            String scheme = authPacket.getScheme();
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if (ap != null) {
                try {
                    // handleAuthentication may close the connection, to allow the client to choose
                    // a different server to connect to.
                    authReturn = ap.handleAuthentication(
                        new ServerAuthenticationProvider.ServerObjs(this, cnxn),
                        authPacket.getAuth());
                } catch (RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
                    authReturn = KeeperException.Code.AUTHFAILED;
                }
            }
            if (authReturn == KeeperException.Code.OK) {
                LOG.info("Session 0x{}: auth success for scheme {} and address {}",
                        Long.toHexString(cnxn.getSessionId()), scheme,
                        cnxn.getRemoteSocketAddress());
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh, null, null);
            } else {
                if (ap == null) {
                    LOG.warn(
                        "No authentication provider for scheme: {} has {}",
                        scheme,
                        ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: {}", scheme);
                }
                // send a response...
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                cnxn.disableRecv();
            }
            return;
        } else if (h.getType() == OpCode.sasl) {
            processSasl(incomingBuffer, cnxn, h);
        } else {
            if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
                // Authentication enforcement is failed
                // Already sent response to user about failure and closed the session, lets return
                return;
            } else {
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
                int length = incomingBuffer.limit();
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejected
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
                si.setOwner(ServerCnxn.me);
                submitRequest(si);
            }
        }
    }

    private static boolean isSaslSuperUser(String id) {
        if (id == null || id.isEmpty()) {
            return false;
        }

        Properties properties = System.getProperties();
        int prefixLen = SASL_SUPER_USER.length();

        for (String k : properties.stringPropertyNames()) {
            if (k.startsWith(SASL_SUPER_USER)
                && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) {
                String value = properties.getProperty(k);

                if (value != null && value.equals(id)) {
                    return true;
                }
            }
        }

        return false;
    }

    private static boolean shouldAllowSaslFailedClientsConnect() {
        return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
    }

    private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
        LOG.debug("Responding to client SASL token.");
        GetSASLRequest clientTokenRecord = new GetSASLRequest();
        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
        byte[] clientToken = clientTokenRecord.getToken();
        LOG.debug("Size of client SASL token: {}", clientToken.length);
        byte[] responseToken = null;
        try {
            ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
            try {
                // note that clientToken might be empty (clientToken.length == 0):
                // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
                // SASL negotiation process.
                responseToken = saslServer.evaluateResponse(clientToken);
                if (saslServer.isComplete()) {
                    String authorizationID = saslServer.getAuthorizationID();
                    LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}",
                            Long.toHexString(cnxn.getSessionId()), authorizationID);
                    cnxn.addAuthInfo(new Id("sasl", authorizationID));

                    if (isSaslSuperUser(authorizationID)) {
                        cnxn.addAuthInfo(new Id("super", ""));
                        LOG.info(
                            "Session 0x{}: Authenticated Id '{}' as super user",
                            Long.toHexString(cnxn.getSessionId()),
                            authorizationID);
                    }
                }
            } catch (SaslException e) {
                LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e);
                if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) {
                    LOG.warn("Maintaining client connection despite SASL authentication failure.");
                } else {
                    int error;
                    if (authHelper.isSaslAuthRequired()) {
                        LOG.warn(
                            "Closing client connection due to server requires client SASL authenticaiton,"
                                + "but client SASL authentication has failed, or client is not configured with SASL "
                                + "authentication.");
                        error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
                    } else {
                        LOG.warn("Closing client connection due to SASL authentication failure.");
                        error = Code.AUTHFAILED.intValue();
                    }

                    ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
                    cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
                    cnxn.sendCloseSession();
                    cnxn.disableRecv();
                    return;
                }
            }
        } catch (NullPointerException e) {
            LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
        }
        if (responseToken != null) {
            LOG.debug("Size of server SASL response: {}", responseToken.length);
        }

        ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
        Record record = new SetSASLResponse(responseToken);
        cnxn.sendResponse(replyHeader, record, "response");
    }

    // entry point for quorum/Learner.java
    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        processTxnForSessionEvents(null, hdr, txn);
        return processTxnInDB(hdr, txn, null);
    }

    // entry point for FinalRequestProcessor.java
    public ProcessTxnResult processTxn(Request request) {
        TxnHeader hdr = request.getHdr();
        processTxnForSessionEvents(request, hdr, request.getTxn());

        final boolean writeRequest = (hdr != null);
        final boolean quorumRequest = request.isQuorum();

        // return fast w/o synchronization when we get a read
        if (!writeRequest && !quorumRequest) {
            return new ProcessTxnResult();
        }
        synchronized (outstandingChanges) {
            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            if (writeRequest) {
                long zxid = hdr.getZxid();
                while (!outstandingChanges.isEmpty()
                        && outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = outstandingChanges.remove();
                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                    if (cr.zxid < zxid) {
                        LOG.warn(
                            "Zxid outstanding 0x{} is less than current 0x{}",
                            Long.toHexString(cr.zxid),
                            Long.toHexString(zxid));
                    }
                    if (outstandingChangesForPath.get(cr.path) == cr) {
                        outstandingChangesForPath.remove(cr.path);
                    }
                }
            }

            // do not add non quorum packets to the queue.
            if (quorumRequest) {
                getZKDatabase().addCommittedProposal(request);
            }
            return rc;
        }
    }

    private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
        int opCode = (request == null) ? hdr.getType() : request.type;
        long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;

        if (opCode == OpCode.createSession) {
            if (hdr != null && txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.commitSession(sessionId, cst.getTimeOut());
            } else if (request == null || !request.isLocalSession()) {
                LOG.warn("*****>>>>> Got {} {}",  txn.getClass(), txn.toString());
            }
        } else if (opCode == OpCode.closeSession) {
            sessionTracker.removeSession(sessionId);
        }
    }

    private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
        if (hdr == null) {
            return new ProcessTxnResult();
        } else {
            return getZKDatabase().processTxn(hdr, txn, digest);
        }
    }

    public Map<Long, Set<Long>> getSessionExpiryMap() {
        return sessionTracker.getSessionExpiryMap();
    }

    /**
     * This method is used to register the ZooKeeperServerShutdownHandler to get
     * server's error or shutdown state change notifications.
     * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for
     * every server state changes {@link #setState(State)}.
     *
     * @param zkShutdownHandler shutdown handler
     */
    void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
        this.zkShutdownHandler = zkShutdownHandler;
    }

    public boolean isResponseCachingEnabled() {
        return isResponseCachingEnabled;
    }

    public void setResponseCachingEnabled(boolean isEnabled) {
        isResponseCachingEnabled = isEnabled;
    }

    public ResponseCache getReadResponseCache() {
        return isResponseCachingEnabled ? readResponseCache : null;
    }

    public ResponseCache getGetChildrenResponseCache() {
        return isResponseCachingEnabled ? getChildrenResponseCache : null;
    }

    protected void registerMetrics() {
        MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();

        final ZKDatabase zkdb = this.getZKDatabase();
        final ServerStats stats = this.serverStats();

        rootContext.registerGauge("avg_latency", stats::getAvgLatency);

        rootContext.registerGauge("max_latency", stats::getMaxLatency);
        rootContext.registerGauge("min_latency", stats::getMinLatency);

        rootContext.registerGauge("packets_received", stats::getPacketsReceived);
        rootContext.registerGauge("packets_sent", stats::getPacketsSent);
        rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections);

        rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests);
        rootContext.registerGauge("uptime", stats::getUptime);

        rootContext.registerGauge("znode_count", zkdb::getNodeCount);

        rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount);
        rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount);

        rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize);

        rootContext.registerGauge("global_sessions", zkdb::getSessionCount);
        rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount);

        OSMXBean osMbean = new OSMXBean();
        rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount);
        rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount);
        rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance);

        rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize);
        rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize);
        rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize);

        rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum);
        rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
        rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
        rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);

        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
                () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree()));
        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE,
                () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree()));
        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE,
                () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree()));
        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE,
                () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree()));
    }

    protected void unregisterMetrics() {

        MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();

        rootContext.unregisterGauge("avg_latency");

        rootContext.unregisterGauge("max_latency");
        rootContext.unregisterGauge("min_latency");

        rootContext.unregisterGauge("packets_received");
        rootContext.unregisterGauge("packets_sent");
        rootContext.unregisterGauge("num_alive_connections");

        rootContext.unregisterGauge("outstanding_requests");
        rootContext.unregisterGauge("uptime");

        rootContext.unregisterGauge("znode_count");

        rootContext.unregisterGauge("watch_count");
        rootContext.unregisterGauge("ephemerals_count");
        rootContext.unregisterGauge("approximate_data_size");

        rootContext.unregisterGauge("global_sessions");
        rootContext.unregisterGauge("local_sessions");

        rootContext.unregisterGauge("open_file_descriptor_count");
        rootContext.unregisterGauge("max_file_descriptor_count");
        rootContext.unregisterGauge("connection_drop_probability");

        rootContext.unregisterGauge("last_client_response_size");
        rootContext.unregisterGauge("max_client_response_size");
        rootContext.unregisterGauge("min_client_response_size");

        rootContext.unregisterGauge("auth_failed_count");
        rootContext.unregisterGauge("non_mtls_remote_conn_count");
        rootContext.unregisterGauge("non_mtls_local_conn_count");

        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE);
        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE);
        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE);
        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE);
    }

    /**
     * Hook into admin server, useful to expose additional data
     * that do not represent metrics.
     *
     * @param response a sink which collects the data.
     */
    public void dumpMonitorValues(BiConsumer<String, Object> response) {
        ServerStats stats = serverStats();
        response.accept("version", Version.getFullVersion());
        response.accept("server_state", stats.getServerState());
    }

    /**
     * Grant or deny authorization to an operation on a node as a function of:
     * @param cnxn :    the server connection
     * @param acl :     set of ACLs for the node
     * @param perm :    the permission that the client is requesting
     * @param ids :     the credentials supplied by the client
     * @param path :    the ZNode path
     * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null.
     */
    public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException {
        if (skipACL) {
            return;
        }

        LOG.debug("Permission requested: {} ", perm);
        LOG.debug("ACLs for node: {}", acl);
        LOG.debug("Client credentials: {}", ids);

        if (acl == null || acl.size() == 0) {
            return;
        }
        for (Id authId : ids) {
            if (authId.getScheme().equals("super")) {
                return;
            }
        }
        for (ACL a : acl) {
            Id id = a.getId();
            if ((a.getPerms() & perm) != 0) {
                if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
                    return;
                }
                ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
                if (ap != null) {
                    for (Id authId : ids) {
                        if (authId.getScheme().equals(id.getScheme())
                            && ap.matches(
                                new ServerAuthenticationProvider.ServerObjs(this, cnxn),
                                new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) {
                            return;
                        }
                    }
                }
            }
        }
        throw new KeeperException.NoAuthException();
    }

    /**
     * check a path whether exceeded the quota.
     *
     * @param path
     *            the path of the node, used for the quota prefix check
     * @param lastData
     *            the current node data, {@code null} for none
     * @param data
     *            the data to be set, or {@code null} for none
     * @param type
     *            currently, create and setData need to check quota
     */
    public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException {
        if (!enforceQuota) {
            return;
        }
        long dataBytes = (data == null) ? 0 : data.length;
        ZKDatabase zkDatabase = getZKDatabase();
        String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
        if (StringUtils.isEmpty(lastPrefix)) {
            return;
        }

        final String namespace = PathUtils.getTopNamespace(path);
        switch (type) {
            case OpCode.create:
                checkQuota(lastPrefix, dataBytes, 1, namespace);
                break;
            case OpCode.setData:
                checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace);
                break;
             default:
                 throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
        }
    }

    /**
     * check a path whether exceeded the quota.
     *
     * @param lastPrefix
                  the path of the node which has a quota.
     * @param bytesDiff
     *            the diff to be added to number of bytes
     * @param countDiff
     *            the diff to be added to the count
     * @param namespace
      *           the namespace for collecting quota exceeded errors
     */
    private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace)
            throws KeeperException.QuotaExceededException {
        LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);

        // now check the quota we set
        String limitNode = Quotas.limitPath(lastPrefix);
        DataNode node = getZKDatabase().getNode(limitNode);
        StatsTrack limitStats;
        if (node == null) {
            // should not happen
            LOG.error("Missing limit node for quota {}", limitNode);
            return;
        }
        synchronized (node) {
            limitStats = new StatsTrack(node.data);
        }
        //check the quota
        boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
        boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);

        if (!checkCountQuota && !checkByteQuota) {
            return;
        }

        //check the statPath quota
        String statNode = Quotas.statPath(lastPrefix);
        node = getZKDatabase().getNode(statNode);

        StatsTrack currentStats;
        if (node == null) {
            // should not happen
            LOG.error("Missing node for stat {}", statNode);
            return;
        }
        synchronized (node) {
            currentStats = new StatsTrack(node.data);
        }

        //check the Count Quota
        if (checkCountQuota) {
            long newCount = currentStats.getCount() + countDiff;
            boolean isCountHardLimit = limitStats.getCountHardLimit() > -1;
            long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();

            if (newCount > countLimit) {
                String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
                RATE_LOGGER.rateLimitLog(msg);
                if (isCountHardLimit) {
                    updateQuotaExceededMetrics(namespace);
                    throw new KeeperException.QuotaExceededException(lastPrefix);
                }
            }
        }

        //check the Byte Quota
        if (checkByteQuota) {
            long newBytes = currentStats.getBytes() + bytesDiff;
            boolean isByteHardLimit = limitStats.getByteHardLimit() > -1;
            long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
            if (newBytes > byteLimit) {
                String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
                RATE_LOGGER.rateLimitLog(msg);
                if (isByteHardLimit) {
                    updateQuotaExceededMetrics(namespace);
                    throw new KeeperException.QuotaExceededException(lastPrefix);
                }
            }
        }
    }

    public static boolean isDigestEnabled() {
        return digestEnabled;
    }

    public static void setDigestEnabled(boolean digestEnabled) {
        LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
        ZooKeeperServer.digestEnabled = digestEnabled;
    }

    /**
     * Trim a path to get the immediate predecessor.
     *
     * @param path
     * @return
     * @throws KeeperException.BadArgumentsException
     */
    private String parentPath(String path) throws KeeperException.BadArgumentsException {
        int lastSlash = path.lastIndexOf('/');
        if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) {
            throw new KeeperException.BadArgumentsException(path);
        }
        return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
    }

    private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException {
        boolean mustCheckACL = false;
        String path = null;
        List<ACL> acl = null;

        switch (request.type) {
        case OpCode.create:
        case OpCode.create2: {
            CreateRequest req = new CreateRequest();
            if (buffer2Record(request.request, req)) {
                mustCheckACL = true;
                acl = req.getAcl();
                path = parentPath(req.getPath());
            }
            break;
        }
        case OpCode.delete: {
            DeleteRequest req = new DeleteRequest();
            if (buffer2Record(request.request, req)) {
                path = parentPath(req.getPath());
            }
            break;
        }
        case OpCode.setData: {
            SetDataRequest req = new SetDataRequest();
            if (buffer2Record(request.request, req)) {
                path = req.getPath();
            }
            break;
        }
        case OpCode.setACL: {
            SetACLRequest req = new SetACLRequest();
            if (buffer2Record(request.request, req)) {
                mustCheckACL = true;
                acl = req.getAcl();
                path = req.getPath();
            }
            break;
        }
        }

        if (mustCheckACL) {
            /* we ignore the extrapolated ACL returned by fixupACL because
             * we only care about it being well-formed (and if it isn't, an
             * exception will be raised).
             */
            PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
        }

        return path;
    }

    private int effectiveACLPerms(Request request) {
        switch (request.type) {
        case OpCode.create:
        case OpCode.create2:
            return ZooDefs.Perms.CREATE;
        case OpCode.delete:
            return ZooDefs.Perms.DELETE;
        case OpCode.setData:
            return ZooDefs.Perms.WRITE;
        case OpCode.setACL:
            return ZooDefs.Perms.ADMIN;
        default:
            return ZooDefs.Perms.ALL;
        }
    }

    /**
     * Check Write Requests for Potential Access Restrictions
     * <p/>
     * Before a request is being proposed to the quorum, lets check it
     * against local ACLs. Non-write requests (read, session, etc.)
     * are passed along. Invalid requests are sent a response.
     * <p/>
     * While we are at it, if the request will set an ACL: make sure it's
     * a valid one.
     *
     * @param request
     * @return true if request is permitted, false if not.
     * @throws java.io.IOException
     */
    public boolean authWriteRequest(Request request) {
        int err;
        String pathToCheck;

        if (!enableEagerACLCheck) {
            return true;
        }

        err = KeeperException.Code.OK.intValue();

        try {
            pathToCheck = effectiveACLPath(request);
            if (pathToCheck != null) {
                checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null);
            }
        } catch (KeeperException.NoAuthException e) {
            LOG.debug("Request failed ACL check", e);
            err = e.code().intValue();
        } catch (KeeperException.InvalidACLException e) {
            LOG.debug("Request has an invalid ACL check", e);
            err = e.code().intValue();
        } catch (KeeperException.NoNodeException e) {
            LOG.debug("ACL check against non-existent node: {}", e.getMessage());
        } catch (KeeperException.BadArgumentsException e) {
            LOG.debug("ACL check against illegal node path: {}", e.getMessage());
        } catch (Throwable t) {
            LOG.error("Uncaught exception in authWriteRequest with: ", t);
            throw t;
        } finally {
            if (err != KeeperException.Code.OK.intValue()) {
                /*  This request has a bad ACL, so we are dismissing it early. */
                decInProcess();
                ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
                try {
                    request.cnxn.sendResponse(rh, null, null);
                } catch (IOException e) {
                    LOG.error("IOException : {}", e);
                }
            }
        }

        return err == KeeperException.Code.OK.intValue();
    }

    private boolean buffer2Record(ByteBuffer request, Record record) {
        boolean rv = false;
        try {
            ByteBufferInputStream.byteBuffer2Record(request, record);
            request.rewind();
            rv = true;
        } catch (IOException ex) {
        }

        return rv;
    }

    public int getOutstandingHandshakeNum() {
        if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
            return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
        } else {
            return 0;
        }
    }

    public boolean isReconfigEnabled() {
        return this.reconfigEnabled;
    }

    public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
        return zkShutdownHandler;
    }

    static void updateQuotaExceededMetrics(final String namespace) {
        if (namespace == null) {
            return;
        }
        ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1);
    }
}

org/apache/zookeeper/server/ZooKeeperServer.java

 

Or download all of them as a single archive file:

File name: zookeeper-server-3.8.0-fyi.zip
File size: 885581 bytes
Release date: 2022-02-25
Download 

 

Apache ZooKeeper Jute Source Code

Download and Install Apache ZooKeeper Source Package

Downloading and Reviewing zookeeper.jar

⇑⇑ FAQ for Apache ZooKeeper

2022-11-16, 13681👍, 0💬