Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Other Resources:
Apache ZooKeeper 3.7.0 Server Source Code
Apache ZooKeeper is an open-source server which enables highly
reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.
You can also browse Apache ZooKeeper Server Source Code below:
⏎ org/apache/zookeeper/
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper; import; import; import; import; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.nio.channels.UnsupportedAddressTypeException; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.LinkedBlockingDeque; import org.apache.zookeeper.ClientCnxn.EndOfStreamException; import org.apache.zookeeper.ClientCnxn.Packet; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.client.ZKClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientCnxnSocketNIO extends ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNIO.class); private final Selector selector =; private SelectionKey sockKey; private SocketAddress localSocketAddress; private SocketAddress remoteSocketAddress; ClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig; initProperties(); } @Override boolean isConnected() { return sockKey != null; } /** * @throws InterruptedException * @throws IOException */ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc =; if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, will already exist if ( == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(; if (! { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && ! { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // disableWrite(); } else { // Just in case enableWrite(); } } } private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) { if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. Iterator<Packet> iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p =; if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("Deferring non-priming packet {} until SASL authentication completes.", p); } } return null; } @Override void cleanup() { if (sockKey != null) { SocketChannel sock = (SocketChannel); sockKey.cancel(); try { sock.socket().shutdownInput(); } catch (IOException e) { LOG.debug("Ignoring exception during shutdown input", e); } try { sock.socket().shutdownOutput(); } catch (IOException e) { LOG.debug("Ignoring exception during shutdown output", e); } try { sock.socket().close(); } catch (IOException e) { LOG.debug("Ignoring exception during socket close", e); } try { sock.close(); } catch (IOException e) { LOG.debug("Ignoring exception during channel close", e); } } try { Thread.sleep(100); } catch (InterruptedException e) { LOG.debug("SendThread interrupted during sleep, ignoring"); } sockKey = null; } @Override void close() { try { if (LOG.isTraceEnabled()) { LOG.trace("Doing client selector close"); } selector.close(); if (LOG.isTraceEnabled()) { LOG.trace("Closed client selector"); } } catch (IOException e) { LOG.warn("Ignoring exception during selector close", e); } } /** * create a socket channel. * @return the created socket channel * @throws IOException */ SocketChannel createSock() throws IOException { SocketChannel sock; sock =; sock.configureBlocking(false); sock.socket().setSoLinger(false, -1); sock.socket().setTcpNoDelay(true); return sock; } /** * register with the selection and connect * @param sock the {@link SocketChannel} * @param addr the address of remote host * @throws IOException */ void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } } @Override void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) { LOG.error("Unable to open socket to {}", addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; } /** * Returns the address to which the socket is connected. * * @return ip address of the remote side of the connection or null if not * connected */ @Override SocketAddress getRemoteSocketAddress() { return remoteSocketAddress; } /** * Returns the local address to which the socket is bound. * * @return ip address of the remote side of the connection or null if not * connected */ @Override SocketAddress getLocalSocketAddress() { return localSocketAddress; } private void updateSocketAddresses() { Socket socket = ((SocketChannel); localSocketAddress = socket.getLocalSocketAddress(); remoteSocketAddress = socket.getRemoteSocketAddress(); } @Override void packetAdded() { wakeupCnxn(); } @Override void onClosing() { wakeupCnxn(); } private synchronized void wakeupCnxn() { selector.wakeup(); } @Override void doTransport( int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {; Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } selected.clear(); } //TODO should this be synchronized? @Override void testableCloseSocket() throws IOException {"testableCloseSocket() called"); // sockKey may be concurrently accessed by multiple // threads. We use tmp here to avoid a race condition SelectionKey tmp = sockKey; if (tmp != null) { ((SocketChannel); } } @Override void saslCompleted() { enableWrite(); } synchronized void enableWrite() { int i = sockKey.interestOps(); if ((i & SelectionKey.OP_WRITE) == 0) { sockKey.interestOps(i | SelectionKey.OP_WRITE); } } private synchronized void disableWrite() { int i = sockKey.interestOps(); if ((i & SelectionKey.OP_WRITE) != 0) { sockKey.interestOps(i & (~SelectionKey.OP_WRITE)); } } private synchronized void enableRead() { int i = sockKey.interestOps(); if ((i & SelectionKey.OP_READ) == 0) { sockKey.interestOps(i | SelectionKey.OP_READ); } } @Override void connectionPrimed() { sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } Selector getSelector() { return selector; } @Override void sendPacket(Packet p) throws IOException { SocketChannel sock = (SocketChannel); if (sock == null) { throw new IOException("Socket is null!"); } p.createBB(); ByteBuffer pbb =; sock.write(pbb); } }
⏎ org/apache/zookeeper/
Or download all of them as a single archive file:
File name: File size: 871011 bytes Release date: 2021-05-17 Download
⇒ Apache ZooKeeper 3.7.0 Jute Source Code
⇐ Download Apache ZooKeeper 3.7.0 Source Package
2022-11-16, 24847👍, 0💬
Popular Posts:
JDK 17 jdk.hotspot.agent.jmod is the JMOD file for JDK 17 Hotspot Agent module. JDK 17 Hotspot Agent...
JDK 11 jdk.xml.dom.jmod is the JMOD file for JDK 11 XML DOM module. JDK 11 XML DOM module compiled c...
How to display XML element type information with the jaxp\ provided in the Apache...
JRE 8 rt.jar is the JAR file for JRE 8 RT (Runtime) libraries. JRE (Java Runtime) 8 is the runtime e...
The JSR 105 XML Digital Signature 1.0.1 FCS implementation provides an API and implementation that a...