Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (102)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (322)
Collections:
Other Resources:
Apache ZooKeeper 3.7.0 Server Source Code
Apache ZooKeeper is an open-source server which enables highly
reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/ClientCnxnSocketNIO.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNIO.class);
private final Selector selector = Selector.open();
private SelectionKey sockKey;
private SocketAddress localSocketAddress;
private SocketAddress remoteSocketAddress;
ClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
this.clientConfig = clientConfig;
initProperties();
}
@Override
boolean isConnected() {
return sockKey != null;
}
/**
* @throws InterruptedException
* @throws IOException
*/
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) {
if (outgoingQueue.isEmpty()) {
return null;
}
// If we've already starting sending the first packet, we better finish
if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
return outgoingQueue.getFirst();
}
// Since client's authentication with server is in progress,
// send only the null-header packet queued by primeConnection().
// This packet must be sent so that the SASL authentication process
// can proceed, but all other packets should wait until
// SASL authentication completes.
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
if (p.requestHeader == null) {
// We've found the priming-packet. Move it to the beginning of the queue.
iter.remove();
outgoingQueue.addFirst(p);
return p;
} else {
// Non-priming packet: defer it until later, leaving it in the queue
// until authentication completes.
LOG.debug("Deferring non-priming packet {} until SASL authentication completes.", p);
}
}
return null;
}
@Override
void cleanup() {
if (sockKey != null) {
SocketChannel sock = (SocketChannel) sockKey.channel();
sockKey.cancel();
try {
sock.socket().shutdownInput();
} catch (IOException e) {
LOG.debug("Ignoring exception during shutdown input", e);
}
try {
sock.socket().shutdownOutput();
} catch (IOException e) {
LOG.debug("Ignoring exception during shutdown output", e);
}
try {
sock.socket().close();
} catch (IOException e) {
LOG.debug("Ignoring exception during socket close", e);
}
try {
sock.close();
} catch (IOException e) {
LOG.debug("Ignoring exception during channel close", e);
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.debug("SendThread interrupted during sleep, ignoring");
}
sockKey = null;
}
@Override
void close() {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Doing client selector close");
}
selector.close();
if (LOG.isTraceEnabled()) {
LOG.trace("Closed client selector");
}
} catch (IOException e) {
LOG.warn("Ignoring exception during selector close", e);
}
}
/**
* create a socket channel.
* @return the created socket channel
* @throws IOException
*/
SocketChannel createSock() throws IOException {
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}
/**
* register with the selection and connect
* @param sock the {@link SocketChannel}
* @param addr the address of remote host
* @throws IOException
*/
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
LOG.error("Unable to open socket to {}", addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
/**
* Returns the address to which the socket is connected.
*
* @return ip address of the remote side of the connection or null if not
* connected
*/
@Override
SocketAddress getRemoteSocketAddress() {
return remoteSocketAddress;
}
/**
* Returns the local address to which the socket is bound.
*
* @return ip address of the remote side of the connection or null if not
* connected
*/
@Override
SocketAddress getLocalSocketAddress() {
return localSocketAddress;
}
private void updateSocketAddresses() {
Socket socket = ((SocketChannel) sockKey.channel()).socket();
localSocketAddress = socket.getLocalSocketAddress();
remoteSocketAddress = socket.getRemoteSocketAddress();
}
@Override
void packetAdded() {
wakeupCnxn();
}
@Override
void onClosing() {
wakeupCnxn();
}
private synchronized void wakeupCnxn() {
selector.wakeup();
}
@Override
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
//TODO should this be synchronized?
@Override
void testableCloseSocket() throws IOException {
LOG.info("testableCloseSocket() called");
// sockKey may be concurrently accessed by multiple
// threads. We use tmp here to avoid a race condition
SelectionKey tmp = sockKey;
if (tmp != null) {
((SocketChannel) tmp.channel()).socket().close();
}
}
@Override
void saslCompleted() {
enableWrite();
}
synchronized void enableWrite() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_WRITE) == 0) {
sockKey.interestOps(i | SelectionKey.OP_WRITE);
}
}
private synchronized void disableWrite() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_WRITE) != 0) {
sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
}
}
private synchronized void enableRead() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_READ) == 0) {
sockKey.interestOps(i | SelectionKey.OP_READ);
}
}
@Override
void connectionPrimed() {
sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
Selector getSelector() {
return selector;
}
@Override
void sendPacket(Packet p) throws IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
p.createBB();
ByteBuffer pbb = p.bb;
sock.write(pbb);
}
}
⏎ org/apache/zookeeper/ClientCnxnSocketNIO.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.7.0-fyi.zip File size: 871011 bytes Release date: 2021-05-17 Download
⇒ Apache ZooKeeper 3.7.0 Jute Source Code
⇐ Download Apache ZooKeeper 3.7.0 Source Package
2022-11-16, ≈53🔥, 0💬
Popular Posts:
Saxon-HE (home edition) is an open source product available under the Mozilla Public License. It pro...
XStream is a simple library to serialize objects to XML and back again. JAR File Size and Download L...
JDK 11 java.security.jgss.jmod is the JMOD file for JDK 11 Security JGSS (Java Generic Security Serv...
Java Architecture for XML Binding (JAXB) is a Java API that allows Java developers to map Java class...
Where to find answers to frequently asked questions on Downloading and Using JDK (Java Development K...