Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Apache ZooKeeper 3.7.0 Server Source Code
Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/ClientCnxnSocketNIO.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.nio.channels.UnsupportedAddressTypeException; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.LinkedBlockingDeque; import org.apache.zookeeper.ClientCnxn.EndOfStreamException; import org.apache.zookeeper.ClientCnxn.Packet; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.client.ZKClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientCnxnSocketNIO extends ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNIO.class); private final Selector selector = Selector.open(); private SelectionKey sockKey; private SocketAddress localSocketAddress; private SocketAddress remoteSocketAddress; ClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig; initProperties(); } @Override boolean isConnected() { return sockKey != null; } /** * @throws InterruptedException * @throws IOException */ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } } } private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) { if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. Iterator<Packet> iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("Deferring non-priming packet {} until SASL authentication completes.", p); } } return null; } @Override void cleanup() { if (sockKey != null) { SocketChannel sock = (SocketChannel) sockKey.channel(); sockKey.cancel(); try { sock.socket().shutdownInput(); } catch (IOException e) { LOG.debug("Ignoring exception during shutdown input", e); } try { sock.socket().shutdownOutput(); } catch (IOException e) { LOG.debug("Ignoring exception during shutdown output", e); } try { sock.socket().close(); } catch (IOException e) { LOG.debug("Ignoring exception during socket close", e); } try { sock.close(); } catch (IOException e) { LOG.debug("Ignoring exception during channel close", e); } } try { Thread.sleep(100); } catch (InterruptedException e) { LOG.debug("SendThread interrupted during sleep, ignoring"); } sockKey = null; } @Override void close() { try { if (LOG.isTraceEnabled()) { LOG.trace("Doing client selector close"); } selector.close(); if (LOG.isTraceEnabled()) { LOG.trace("Closed client selector"); } } catch (IOException e) { LOG.warn("Ignoring exception during selector close", e); } } /** * create a socket channel. * @return the created socket channel * @throws IOException */ SocketChannel createSock() throws IOException { SocketChannel sock; sock = SocketChannel.open(); sock.configureBlocking(false); sock.socket().setSoLinger(false, -1); sock.socket().setTcpNoDelay(true); return sock; } /** * register with the selection and connect * @param sock the {@link SocketChannel} * @param addr the address of remote host * @throws IOException */ void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } } @Override void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) { LOG.error("Unable to open socket to {}", addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; } /** * Returns the address to which the socket is connected. * * @return ip address of the remote side of the connection or null if not * connected */ @Override SocketAddress getRemoteSocketAddress() { return remoteSocketAddress; } /** * Returns the local address to which the socket is bound. * * @return ip address of the remote side of the connection or null if not * connected */ @Override SocketAddress getLocalSocketAddress() { return localSocketAddress; } private void updateSocketAddresses() { Socket socket = ((SocketChannel) sockKey.channel()).socket(); localSocketAddress = socket.getLocalSocketAddress(); remoteSocketAddress = socket.getRemoteSocketAddress(); } @Override void packetAdded() { wakeupCnxn(); } @Override void onClosing() { wakeupCnxn(); } private synchronized void wakeupCnxn() { selector.wakeup(); } @Override void doTransport( int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } selected.clear(); } //TODO should this be synchronized? @Override void testableCloseSocket() throws IOException { LOG.info("testableCloseSocket() called"); // sockKey may be concurrently accessed by multiple // threads. We use tmp here to avoid a race condition SelectionKey tmp = sockKey; if (tmp != null) { ((SocketChannel) tmp.channel()).socket().close(); } } @Override void saslCompleted() { enableWrite(); } synchronized void enableWrite() { int i = sockKey.interestOps(); if ((i & SelectionKey.OP_WRITE) == 0) { sockKey.interestOps(i | SelectionKey.OP_WRITE); } } private synchronized void disableWrite() { int i = sockKey.interestOps(); if ((i & SelectionKey.OP_WRITE) != 0) { sockKey.interestOps(i & (~SelectionKey.OP_WRITE)); } } private synchronized void enableRead() { int i = sockKey.interestOps(); if ((i & SelectionKey.OP_READ) == 0) { sockKey.interestOps(i | SelectionKey.OP_READ); } } @Override void connectionPrimed() { sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } Selector getSelector() { return selector; } @Override void sendPacket(Packet p) throws IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } p.createBB(); ByteBuffer pbb = p.bb; sock.write(pbb); } }
⏎ org/apache/zookeeper/ClientCnxnSocketNIO.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.7.0-fyi.zip File size: 871011 bytes Release date: 2021-05-17 Download
⇒ Apache ZooKeeper 3.7.0 Jute Source Code
⇐ Download Apache ZooKeeper 3.7.0 Source Package
2022-11-16, 13216👍, 0💬
Popular Posts:
What is the jaxp\SourceValidator.jav aprovided in the Apache Xerces package? I have Apache Xerces 2....
Where to find answers to frequently asked questions on Downloading and Installing Connector/J - JDBC...
JDK 11 java.xml.jmod is the JMOD file for JDK 11 XML (eXtensible Markup Language) module. JDK 11 XML...
What Is activation.jar? I heard it's related to JAF (JavaBeans Activation Framework) 1.1? The if you...
How to download and install ojdbc11.jar for Oracle 21c? ojdbc11.jar for Oracle 21c is a Java JDBC Dr...