Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Apache ZooKeeper 3.7.0 Server Source Code
Apache ZooKeeper is an open-source server which enables highly
reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/server/quorum/ObserverMaster.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server.quorum; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Used by Followers to host Observers. This reduces the network load on the Leader process by pushing * the responsibility for keeping Observers in sync off the leading peer. * * It is expected that Observers will continue to perform the initial vetting of clients and requests. * Observers send the request to the follower where it is received by an ObserverMaster. * * The ObserverMaster forwards a copy of the request to the ensemble Leader and inserts it into its own * request processor pipeline where it can be matched with the response comes back. All commits received * from the Leader will be forwarded along to every Learner connected to the ObserverMaster. * * New Learners connecting to a Follower will receive a LearnerHandler object and be party to its syncing logic * to be brought up to date. * * The logic is quite a bit simpler than the corresponding logic in Leader because it only hosts observers. */ public class ObserverMaster extends LearnerMaster implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class); //Follower counter private final AtomicLong followerCounter = new AtomicLong(-1); private QuorumPeer self; private FollowerZooKeeperServer zks; private int port; private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new ConcurrentHashMap<LearnerHandler, Boolean>()); private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>(); /** * we want to keep a log of past txns so that observers can sync up with us when we connect, * but we can't keep everything in memory, so this limits how much memory will be dedicated * to keeping recent txns. */ private static final int PKTS_SIZE_LIMIT = 32 * 1024 * 1024; private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT); private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue<>(); private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue<>(); private int pktsSize = 0; private long lastProposedZxid; // ensure ordering of revalidations returned to this learner private final Object revalidateSessionLock = new Object(); private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue<>(); static class Revalidation { public final long sessionId; public final int timeout; public final LearnerHandler handler; Revalidation(final Long sessionId, final int timeout, final LearnerHandler handler) { this.sessionId = sessionId; this.timeout = timeout; this.handler = handler; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } final Revalidation that = (Revalidation) o; return sessionId == that.sessionId && timeout == that.timeout && handler.equals(that.handler); } @Override public int hashCode() { int result = (int) (sessionId ^ (sessionId >>> 32)); result = 31 * result + timeout; result = 31 * result + handler.hashCode(); return result; } } private Thread thread; private ServerSocket ss; private boolean listenerRunning; private ScheduledExecutorService pinger; Runnable ping = new Runnable() { @Override public void run() { for (LearnerHandler lh : activeObservers) { lh.ping(); } } }; ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) { this.self = self; this.zks = zks; this.port = port; } @Override public void addLearnerHandler(LearnerHandler learnerHandler) { if (!listenerRunning) { throw new RuntimeException(("ObserverMaster is not running")); } } @Override public void removeLearnerHandler(LearnerHandler learnerHandler) { activeObservers.remove(learnerHandler); } @Override public int syncTimeout() { return self.getSyncLimit() * self.getTickTime(); } @Override public int getTickOfNextAckDeadline() { return self.tick.get() + self.syncLimit; } @Override public int getTickOfInitialAckDeadline() { return self.tick.get() + self.initLimit + self.syncLimit; } @Override public long getAndDecrementFollowerCounter() { return followerCounter.getAndDecrement(); } @Override public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException { // since this is done by an active follower, we don't need to wait for anything } @Override public void waitForStartup() throws InterruptedException { // since this is done by an active follower, we don't need to wait for anything } @Override public synchronized long getLastProposed() { return lastProposedZxid; } @Override public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { return self.getCurrentEpoch(); } @Override public ZKDatabase getZKDatabase() { return zks.getZKDatabase(); } @Override public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { // no need to wait since we are a follower } @Override public int getCurrentTick() { return self.tick.get(); } @Override public void processAck(long sid, long zxid, SocketAddress localSocketAddress) { if ((zxid & 0xffffffffL) == 0) { /* * We no longer process NEWLEADER ack by this method. However, * the learner sends ack back to the leader after it gets UPTODATE * so we just ignore the message. */ return; } throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid)); } @Override public void touch(long sess, int to) { zks.getSessionTracker().touchSession(sess, to); } boolean revalidateLearnerSession(QuorumPacket qp) throws IOException { ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); DataInputStream dis = new DataInputStream(bis); long id = dis.readLong(); boolean valid = dis.readBoolean(); Iterator<Revalidation> itr = pendingRevalidations.iterator(); if (!itr.hasNext()) { // not a learner session, handle locally return false; } Revalidation revalidation = itr.next(); if (revalidation.sessionId != id) { // not a learner session, handle locally return false; } itr.remove(); LearnerHandler learnerHandler = revalidation.handler; // create a copy here as the qp object is reused by the Follower and may be mutated QuorumPacket deepCopy = new QuorumPacket( qp.getType(), qp.getZxid(), Arrays.copyOf(qp.getData(), qp.getData().length), qp.getAuthinfo() == null ? null : new ArrayList<>(qp.getAuthinfo())); learnerHandler.queuePacket(deepCopy); // To keep consistent as leader, touch the session when it's // revalidating the session, only update if it's a valid session. if (valid) { touch(revalidation.sessionId, revalidation.timeout); } return true; } @Override public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException { ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); DataInputStream dis = new DataInputStream(bis); long id = dis.readLong(); int to = dis.readInt(); synchronized (revalidateSessionLock) { pendingRevalidations.add(new Revalidation(id, to, learnerHandler)); Learner learner = zks.getLearner(); if (learner != null) { learner.writePacket(qp, true); } } } @Override public void submitLearnerRequest(Request si) { zks.processObserverRequest(si); } @Override public synchronized long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid) { Iterator<QuorumPacket> itr = committedPkts.iterator(); if (itr.hasNext()) { QuorumPacket packet = itr.next(); if (packet.getZxid() > lastSeenZxid + 1) { LOG.error( "LearnerHandler is too far behind (0x{} < 0x{}), disconnecting {} at {}", Long.toHexString(lastSeenZxid + 1), Long.toHexString(packet.getZxid()), learnerHandler.getSid(), learnerHandler.getRemoteAddress()); learnerHandler.shutdown(); return -1; } else if (packet.getZxid() == lastSeenZxid + 1) { learnerHandler.queuePacket(packet); } long queueHeadZxid = packet.getZxid(); long queueBytesUsed = LearnerHandler.packetSize(packet); while (itr.hasNext()) { packet = itr.next(); if (packet.getZxid() <= lastSeenZxid) { continue; } learnerHandler.queuePacket(packet); queueBytesUsed += LearnerHandler.packetSize(packet); } LOG.info( "finished syncing observer from retained commit queue: sid {}, " + "queue head 0x{}, queue tail 0x{}, sync position 0x{}, num packets used {}, " + "num bytes used {}", learnerHandler.getSid(), Long.toHexString(queueHeadZxid), Long.toHexString(packet.getZxid()), Long.toHexString(lastSeenZxid), packet.getZxid() - lastSeenZxid, queueBytesUsed); } activeObservers.add(learnerHandler); return lastProposedZxid; } @Override public long getQuorumVerifierVersion() { return self.getQuorumVerifier().getVersion(); } @Override public String getPeerInfo(long sid) { QuorumPeer.QuorumServer server = self.getView().get(sid); return server == null ? "" : server.toString(); } @Override public byte[] getQuorumVerifierBytes() { return self.getLastSeenQuorumVerifier().toString().getBytes(UTF_8); } @Override public QuorumAuthServer getQuorumAuthServer() { return (self == null) ? null : self.authServer; } void proposalReceived(QuorumPacket qp) { proposedPkts.add(new QuorumPacket(Leader.INFORM, qp.getZxid(), qp.getData(), null)); } private synchronized QuorumPacket removeProposedPacket(long zxid) { QuorumPacket pkt = proposedPkts.peek(); if (pkt == null || pkt.getZxid() > zxid) { LOG.debug("ignore missing proposal packet for {}", Long.toHexString(zxid)); return null; } if (pkt.getZxid() != zxid) { final String m = String.format( "Unexpected proposal packet on commit ack, expected zxid 0x%d got zxid 0x%d", zxid, pkt.getZxid()); LOG.error(m); throw new RuntimeException(m); } proposedPkts.remove(); return pkt; } private synchronized void cacheCommittedPacket(final QuorumPacket pkt) { committedPkts.add(pkt); pktsSize += LearnerHandler.packetSize(pkt); // remove 5 packets for every one added as we near the size limit for (int i = 0; pktsSize > pktsSizeLimit * 0.8 && i < 5; i++) { QuorumPacket oldPkt = committedPkts.poll(); if (oldPkt == null) { pktsSize = 0; break; } pktsSize -= LearnerHandler.packetSize(oldPkt); } // enforce the size limit as a hard cap while (pktsSize > pktsSizeLimit) { QuorumPacket oldPkt = committedPkts.poll(); if (oldPkt == null) { pktsSize = 0; break; } pktsSize -= LearnerHandler.packetSize(oldPkt); } } private synchronized void sendPacket(final QuorumPacket pkt) { for (LearnerHandler lh : activeObservers) { lh.queuePacket(pkt); } lastProposedZxid = pkt.getZxid(); } synchronized void proposalCommitted(long zxid) { QuorumPacket pkt = removeProposedPacket(zxid); if (pkt == null) { return; } cacheCommittedPacket(pkt); sendPacket(pkt); } synchronized void informAndActivate(long zxid, long suggestedLeaderId) { QuorumPacket pkt = removeProposedPacket(zxid); if (pkt == null) { return; } // Build the INFORMANDACTIVATE packet QuorumPacket informAndActivateQP = Leader.buildInformAndActivePacket(zxid, suggestedLeaderId, pkt.getData()); cacheCommittedPacket(informAndActivateQP); sendPacket(informAndActivateQP); } public synchronized void start() throws IOException { if (thread != null && thread.isAlive()) { return; } listenerRunning = true; int backlog = 10; // dog science InetAddress address = self.getQuorumAddress().getReachableOrOne().getAddress(); if (self.shouldUsePortUnification() || self.isSslQuorum()) { boolean allowInsecureConnection = self.shouldUsePortUnification(); if (self.getQuorumListenOnAllIPs()) { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, port, backlog); } else { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, port, backlog, address); } } else { if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(port, backlog); } else { ss = new ServerSocket(port, backlog, address); } } thread = new Thread(this, "ObserverMaster"); thread.start(); pinger = Executors.newSingleThreadScheduledExecutor(); pinger.scheduleAtFixedRate(ping, self.tickTime / 2, self.tickTime / 2, TimeUnit.MILLISECONDS); } public void run() { ServerSocket ss; synchronized (this) { ss = this.ss; } while (listenerRunning) { try { Socket s = ss.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit s.setSoTimeout(self.tickTime * self.initLimit); BufferedInputStream is = new BufferedInputStream(s.getInputStream()); LearnerHandler lh = new LearnerHandler(s, is, this); lh.start(); } catch (Exception e) { if (listenerRunning) { LOG.debug("Ignoring accept exception (maybe shutting down)", e); } else { LOG.debug("Ignoring accept exception (maybe client closed)", e); } } } /* * we don't need to close ss because we only got here because listenerRunning is * false and that is set and then ss is closed() in stop() */ } public synchronized void stop() { listenerRunning = false; if (pinger != null) { pinger.shutdownNow(); } if (ss != null) { try { ss.close(); } catch (IOException e) { e.printStackTrace(); } } for (LearnerHandler lh : activeObservers) { lh.shutdown(); } } int getNumActiveObservers() { return activeObservers.size(); } public Iterable<Map<String, Object>> getActiveObservers() { Set<Map<String, Object>> info = new HashSet<>(); for (LearnerHandler lh : activeObservers) { info.add(lh.getLearnerHandlerInfo()); } return info; } public void resetObserverConnectionStats() { for (LearnerHandler lh : activeObservers) { lh.resetObserverConnectionStats(); } } int getPktsSizeLimit() { return pktsSizeLimit; } static void setPktsSizeLimit(final int sizeLimit) { pktsSizeLimit = sizeLimit; } @Override public void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket) { LearnerHandlerBean bean = new LearnerHandlerBean(learnerHandler, socket); if (zks.registerJMX(bean)) { connectionBeans.put(learnerHandler, bean); } } @Override public void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler) { LearnerHandlerBean bean = connectionBeans.remove(learnerHandler); if (bean != null) { MBeanRegistry.getInstance().unregister(bean); } } }
⏎ org/apache/zookeeper/server/quorum/ObserverMaster.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.7.0-fyi.zip File size: 871011 bytes Release date: 2021-05-17 Download
⇒ Apache ZooKeeper 3.7.0 Jute Source Code
⇐ Download Apache ZooKeeper 3.7.0 Source Package
2022-11-16, 24867👍, 0💬
Popular Posts:
HttpComponents Core Source Code Files are provided in the source package file, httpcomponents-core-5...
commons-io-2.6-sources.j aris the source JAR file for Apache Commons IO 2.6, which is a library of u...
Apache Neethi provides general framework for the programmers to use WS Policy. It is compliant with ...
What Is commons-lang3-3.1.jar? commons-lang3-3.1.jar is the JAR file for Apache Commons Lang 3.1, wh...
Apache Log4j IOStreams is a Log4j API extension that provides numerous classes from java.io that can...