Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Apache ZooKeeper Server Source Code
Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source package file, apache-zookeeper-3.8.0.tar.gz.
You can download apache-zookeeper-3.8.0.tar.gz as described in the previous tutorial and go to the "zookeeper-server" sub-folder to view Apache ZooKeeper Server Source Code files.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/server/RequestThrottler.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.LinkedBlockingQueue; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.util.ServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * When enabled, the RequestThrottler limits the number of outstanding requests * currently submitted to the request processor pipeline. The throttler augments * the limit imposed by the <code>globalOutstandingLimit</code> that is enforced * by the connection layer ({@link NIOServerCnxn}, {@link NettyServerCnxn}). * * The connection layer limit applies backpressure against the TCP connection by * disabling selection on connections once the request limit is reached. However, * the connection layer always allows a connection to send at least one request * before disabling selection on that connection. Thus, in a scenario with 40000 * client connections, the total number of requests inflight may be as high as * 40000 even if the <code>globalOustandingLimit</code> was set lower. * * The RequestThrottler addresses this issue by adding additional queueing. When * enabled, client connections no longer submit requests directly to the request * processor pipeline but instead to the RequestThrottler. The RequestThrottler * is then responsible for issuing requests to the request processors, and * enforces a separate <code>maxRequests</code> limit. If the total number of * outstanding requests is higher than <code>maxRequests</code>, the throttler * will continually stall for <code>stallTime</code> milliseconds until * underlimit. * * The RequestThrottler can also optionally drop stale requests rather than * submit them to the processor pipeline. A stale request is a request sent * by a connection that is already closed, and/or a request whose latency * will end up being higher than its associated session timeout. The notion * of staleness is configurable, @see Request for more details. * * To ensure ordering guarantees, if a request is ever dropped from a connection * that connection is closed and flagged as invalid. All subsequent requests * inflight from that connection are then dropped as well. */ public class RequestThrottler extends ZooKeeperCriticalThread { private static final Logger LOG = LoggerFactory.getLogger(RequestThrottler.class); private final LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); private final ZooKeeperServer zks; private volatile boolean stopping; private volatile boolean killed; private static final String SHUTDOWN_TIMEOUT = "zookeeper.request_throttler.shutdownTimeout"; private static int shutdownTimeout; static { shutdownTimeout = Integer.getInteger(SHUTDOWN_TIMEOUT, 10000); LOG.info("{} = {} ms", SHUTDOWN_TIMEOUT, shutdownTimeout); } /** * The total number of outstanding requests allowed before the throttler * starts stalling. * * When maxRequests = 0, throttling is disabled. */ private static volatile int maxRequests = Integer.getInteger("zookeeper.request_throttle_max_requests", 0); /** * The time (in milliseconds) this is the maximum time for which throttler * thread may wait to be notified that it may proceed processing a request. */ private static volatile int stallTime = Integer.getInteger("zookeeper.request_throttle_stall_time", 100); /** * When true, the throttler will drop stale requests rather than issue * them to the request pipeline. A stale request is a request sent by * a connection that is now closed, and/or a request that will have a * request latency higher than the sessionTimeout. The staleness of * a request is tunable property, @see Request for details. */ private static volatile boolean dropStaleRequests = Boolean.parseBoolean(System.getProperty("zookeeper.request_throttle_drop_stale", "true")); protected boolean shouldThrottleOp(Request request, long elapsedTime) { return request.isThrottlable() && ZooKeeperServer.getThrottledOpWaitTime() > 0 && elapsedTime > ZooKeeperServer.getThrottledOpWaitTime(); } public RequestThrottler(ZooKeeperServer zks) { super("RequestThrottler", zks.getZooKeeperServerListener()); this.zks = zks; this.stopping = false; this.killed = false; } public static int getMaxRequests() { return maxRequests; } public static void setMaxRequests(int requests) { maxRequests = requests; } public static int getStallTime() { return stallTime; } public static void setStallTime(int time) { stallTime = time; } public static boolean getDropStaleRequests() { return dropStaleRequests; } public static void setDropStaleRequests(boolean drop) { dropStaleRequests = drop; } @Override public void run() { try { while (true) { if (killed) { break; } Request request = submittedRequests.take(); if (Request.requestOfDeath == request) { break; } if (request.mustDrop()) { continue; } // Throttling is disabled when maxRequests = 0 if (maxRequests > 0) { while (!killed) { if (dropStaleRequests && request.isStale()) { // Note: this will close the connection dropRequest(request); ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1); request = null; break; } if (zks.getInProcess() < maxRequests) { break; } throttleSleep(stallTime); } } if (killed) { break; } // A dropped stale request will be null if (request != null) { if (request.isStale()) { ServerMetrics.getMetrics().STALE_REQUESTS.add(1); } final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime; ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime); if (shouldThrottleOp(request, elapsedTime)) { request.setIsThrottled(true); ServerMetrics.getMetrics().THROTTLED_OPS.add(1); } zks.submitRequestNow(request); } } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } int dropped = drainQueue(); LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } private synchronized void throttleSleep(int stallTime) { try { ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); this.wait(stallTime); } catch (InterruptedException ie) { return; } } @SuppressFBWarnings(value = "NN_NAKED_NOTIFY", justification = "state change is in ZooKeeperServer.decInProgress() ") public synchronized void throttleWake() { this.notify(); } private int drainQueue() { // If the throttler shutdown gracefully, the queue will be empty. // However, if the shutdown time limit was reached and the throttler // was killed, we have no other option than to drop all remaining // requests on the floor. int dropped = 0; Request request; LOG.info("Draining request throttler queue"); while ((request = submittedRequests.poll()) != null) { dropped += 1; dropRequest(request); } return dropped; } private void dropRequest(Request request) { // Since we're dropping a request on the floor, we must mark the // connection as invalid to ensure any future requests from this // connection are also dropped in order to ensure ordering // semantics. ServerCnxn conn = request.getConnection(); if (conn != null) { // Note: this will close the connection conn.setInvalid(); } // Notify ZooKeeperServer that the request has finished so that it can // update any request accounting/throttling limits. zks.requestFinished(request); } public void submitRequest(Request request) { if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { request.requestThrottleQueueTime = Time.currentElapsedTime(); submittedRequests.add(request); } } public int getInflight() { return submittedRequests.size(); } public void shutdown() { // Try to shutdown gracefully LOG.info("Shutting down"); stopping = true; submittedRequests.add(Request.requestOfDeath); try { this.join(shutdownTimeout); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for {} to finish", this); } // Forcibly shutdown if necessary in order to ensure request // queue is drained. killed = true; try { this.join(); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for {} to finish", this); //TODO apply ZOOKEEPER-575 and remove this line. ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } } }
⏎ org/apache/zookeeper/server/RequestThrottler.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.8.0-fyi.zip File size: 885581 bytes Release date: 2022-02-25 Download
⇒ Apache ZooKeeper Jute Source Code
⇐ Download and Install Apache ZooKeeper Source Package
2022-11-16, 13669👍, 0💬
Popular Posts:
JDK 8 jconsole.jar is the JAR file for JDK 8 JConsole, which is a graphical monitoring tool to monit...
JDOM provides a solution for using XML from Java that is as simple as Java itself. There is no compe...
What Is in Xerces-J-bin.2.12.2.zip? Xerces-J-bin.2.12.2.zip file is the distribution package ZIP fil...
JDK 11 java.naming.jmod is the JMOD file for JDK 11 Naming module. JDK 11 Naming module compiled cla...
Java Architecture for XML Binding (JAXB) is a Java API that allows Java developers to map Java class...