Apache ZooKeeper Server Source Code

Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.

Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.

You can also browse Apache ZooKeeper Server Source Code below:

✍: FYIcenter.com

org/apache/zookeeper/server/controller/ZooKeeperServerController.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.controller;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Class which accepts commands to modify ZooKeeperServer state or Connection state at runtime for the purpose of
 * single machine integration testing. Not meant to be used in production. It is recommended to use this in conjunction
 * with the CommandListener HttpServer and CommandClient.
 *
 */
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "quorum peer is internally synchronized.")
public class ZooKeeperServerController {

    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerController.class);
    private static final long DEFAULT_DELAY_MS = 1000;

    private QuorumPeer quorumPeer;
    private ControllableConnectionFactory cnxnFactory;

    public ZooKeeperServerController(QuorumPeerConfig config) throws IOException {
        if (config == null) {
            throw new IllegalArgumentException("ZooKeeperServerController requires a valid config!");
        }

        cnxnFactory = new ControllableConnectionFactory();
        cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog());
        quorumPeer = QuorumPeer.createFromConfig(config);
        quorumPeer.setCnxnFactory(cnxnFactory);
    }

    public void run() {
        try {
            quorumPeer.start();
            quorumPeer.join();
        } catch (Exception ex) {
            LOG.error("Fatal error starting quorum peer", ex);
            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
        }
    }

    protected ServerCnxnFactory getCnxnFactory() {
        return cnxnFactory;
    }

    public synchronized void shutdown() {
        if (this.cnxnFactory != null) {
            this.cnxnFactory.shutdown();
            this.cnxnFactory = null;
        }

        if (this.quorumPeer != null && this.quorumPeer.isRunning()) {
            this.quorumPeer.shutdown();
            this.quorumPeer = null;
        }
    }

    public synchronized boolean isReady() {
        return this.cnxnFactory != null
                && this.quorumPeer != null
                && this.quorumPeer.isRunning()
                && this.quorumPeer.getActiveServer() != null;
    }

    /**
     * Process the command. An exception indicates errors. No exception indicates success.
     */
    public void processCommand(ControlCommand command) {
        if (command == null) {
            throw new IllegalArgumentException("Invalid command parameter!");
        }

        LOG.info("processing command {}{}", command.getAction(),
                command.getParameter() == null ? "" : "[" + command.getParameter() + "]");

        // Don't process command if we are shutting down or still initializing.
        if (!isReady()) {
            throw new IllegalStateException("Service is not ready. It has already been shutdown or is still initializing.");
        }

        switch (command.getAction()) {
            case PING:
                // NO-OP
                break;
            case SHUTDOWN:
                shutdown();
                break;
            case CLOSECONNECTION:
                if (command.getParameter() == null) {
                    cnxnFactory.closeAll(ServerCnxn.DisconnectReason.CLOSE_ALL_CONNECTIONS_FORCED);
                } else {
                    // A single parameter should be a session id as long.
                    // Parse failure exceptions will be sent to the caller.
                    cnxnFactory.closeSession(Long.decode(command.getParameter()),
                            ServerCnxn.DisconnectReason.CONNECTION_CLOSE_FORCED);
                }
                break;
            case EXPIRESESSION:
                // TODO: (hanm) implement once dependent feature is ready.
                if (command.getParameter() == null) {
                    // expireAllSessions();
                } else {
                    // A single parameter should be a session id as long.
                    // Parse failure exceptions will be sent to the caller
                    // expireSession(Long.decode(command.getParameter()));
                }
                break;
            case REJECTCONNECTIONS:
                // TODO: (hanm) implement once dependent feature is ready.
                //cnxnFactory.rejectNewConnections();
                break;
            case ADDDELAY:
                cnxnFactory.delayResponses(command.getParameter() == null
                        ? DEFAULT_DELAY_MS : Long.decode(command.getParameter()));
                break;
            case NORESPONSE:
                if (command.getParameter() == null) {
                    cnxnFactory.holdAllFutureResponses();
                } else {
                    cnxnFactory.holdFutureResponses(Long.decode(command.getParameter()));
                }
                break;
            case FAILREQUESTS:
                if (command.getParameter() == null) {
                    cnxnFactory.failAllFutureRequests();
                } else {
                    cnxnFactory.failFutureRequests(Long.decode(command.getParameter()));
                }
                break;
            case RESET:
                cnxnFactory.resetBadBehavior();
                break;
            case ELECTNEWLEADER:
                quorumPeer.startLeaderElection();
                break;
            default:
                throw new IllegalArgumentException("Unknown command: " + command);
        }
    }

}

org/apache/zookeeper/server/controller/ZooKeeperServerController.java

 

⇒ Apache ZooKeeper Jute Source Code

⇐ What Is Apache ZooKeeper

⇑ Downloading and Reviewing zookeeper.jar

⇑⇑ FAQ for Apache ZooKeeper

2018-10-18, 28915👍, 1💬