Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Apache ZooKeeper 3.7.0 Server Source Code
Apache ZooKeeper is an open-source server which enables highly
reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server.util; import static org.apache.zookeeper.ZooDefs.OpCode.checkWatches; import static org.apache.zookeeper.ZooDefs.OpCode.create; import static org.apache.zookeeper.ZooDefs.OpCode.create2; import static org.apache.zookeeper.ZooDefs.OpCode.createContainer; import static org.apache.zookeeper.ZooDefs.OpCode.delete; import static org.apache.zookeeper.ZooDefs.OpCode.deleteContainer; import static org.apache.zookeeper.ZooDefs.OpCode.exists; import static org.apache.zookeeper.ZooDefs.OpCode.getACL; import static org.apache.zookeeper.ZooDefs.OpCode.getChildren; import static org.apache.zookeeper.ZooDefs.OpCode.getChildren2; import static org.apache.zookeeper.ZooDefs.OpCode.getData; import static org.apache.zookeeper.ZooDefs.OpCode.removeWatches; import static org.apache.zookeeper.ZooDefs.OpCode.setACL; import static org.apache.zookeeper.ZooDefs.OpCode.setData; import static org.apache.zookeeper.ZooDefs.OpCode.setWatches2; import static org.apache.zookeeper.ZooDefs.OpCode.sync; import java.io.PrintWriter; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.server.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class holds the requests path ( up till a certain depth) stats per request type */ public class RequestPathMetricsCollector { private static final Logger LOG = LoggerFactory.getLogger(RequestPathMetricsCollector.class); // How many seconds does each slot represent, default is 15 seconds. private final int REQUEST_STATS_SLOT_DURATION; // How many slots we keep, default is 60 so it's 15 minutes total history. private final int REQUEST_STATS_SLOT_CAPACITY; // How far down the path we keep, default is 6. private final int REQUEST_PREPROCESS_PATH_DEPTH; // Sample rate, default is 0.1 (10%). private final float REQUEST_PREPROCESS_SAMPLE_RATE; private final long COLLECTOR_INITIAL_DELAY; private final long COLLECTOR_DELAY; private final int REQUEST_PREPROCESS_TOPPATH_MAX; private final boolean enabled; public static final String PATH_STATS_SLOT_CAPACITY = "zookeeper.pathStats.slotCapacity"; public static final String PATH_STATS_SLOT_DURATION = "zookeeper.pathStats.slotDuration"; public static final String PATH_STATS_MAX_DEPTH = "zookeeper.pathStats.maxDepth"; public static final String PATH_STATS_SAMPLE_RATE = "zookeeper.pathStats.sampleRate"; public static final String PATH_STATS_COLLECTOR_INITIAL_DELAY = "zookeeper.pathStats.initialDelay"; public static final String PATH_STATS_COLLECTOR_DELAY = "zookeeper.pathStats.delay"; public static final String PATH_STATS_TOP_PATH_MAX = "zookeeper.pathStats.topPathMax"; public static final String PATH_STATS_ENABLED = "zookeeper.pathStats.enabled"; private static final String PATH_SEPERATOR = "/"; private final Map<String, PathStatsQueue> immutableRequestsMap; private final ScheduledThreadPoolExecutor scheduledExecutor; private final boolean accurateMode; public RequestPathMetricsCollector() { this(false); } public RequestPathMetricsCollector(boolean accurateMode) { final Map<String, PathStatsQueue> requestsMap = new HashMap<>(); this.accurateMode = accurateMode; REQUEST_PREPROCESS_TOPPATH_MAX = Integer.getInteger(PATH_STATS_TOP_PATH_MAX, 20); REQUEST_STATS_SLOT_DURATION = Integer.getInteger(PATH_STATS_SLOT_DURATION, 15); REQUEST_STATS_SLOT_CAPACITY = Integer.getInteger(PATH_STATS_SLOT_CAPACITY, 60); REQUEST_PREPROCESS_PATH_DEPTH = Integer.getInteger(PATH_STATS_MAX_DEPTH, 6); REQUEST_PREPROCESS_SAMPLE_RATE = Float.parseFloat(System.getProperty(PATH_STATS_SAMPLE_RATE, "0.1")); COLLECTOR_INITIAL_DELAY = Long.getLong(PATH_STATS_COLLECTOR_INITIAL_DELAY, 5); COLLECTOR_DELAY = Long.getLong(PATH_STATS_COLLECTOR_DELAY, 5); enabled = Boolean.getBoolean(PATH_STATS_ENABLED); LOG.info("{} = {}", PATH_STATS_SLOT_CAPACITY, REQUEST_STATS_SLOT_CAPACITY); LOG.info("{} = {}", PATH_STATS_SLOT_DURATION, REQUEST_STATS_SLOT_DURATION); LOG.info("{} = {}", PATH_STATS_MAX_DEPTH, REQUEST_PREPROCESS_PATH_DEPTH); LOG.info("{} = {}", PATH_STATS_COLLECTOR_INITIAL_DELAY, COLLECTOR_INITIAL_DELAY); LOG.info("{} = {}", PATH_STATS_COLLECTOR_DELAY, COLLECTOR_DELAY); LOG.info("{} = {}", PATH_STATS_ENABLED, enabled); this.scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); requestsMap.put(Request.op2String(create), new PathStatsQueue(create)); requestsMap.put(Request.op2String(create2), new PathStatsQueue(create2)); requestsMap.put(Request.op2String(createContainer), new PathStatsQueue(createContainer)); requestsMap.put(Request.op2String(deleteContainer), new PathStatsQueue(deleteContainer)); requestsMap.put(Request.op2String(delete), new PathStatsQueue(delete)); requestsMap.put(Request.op2String(exists), new PathStatsQueue(exists)); requestsMap.put(Request.op2String(setData), new PathStatsQueue(setData)); requestsMap.put(Request.op2String(getData), new PathStatsQueue(getData)); requestsMap.put(Request.op2String(getACL), new PathStatsQueue(getACL)); requestsMap.put(Request.op2String(setACL), new PathStatsQueue(setACL)); requestsMap.put(Request.op2String(getChildren), new PathStatsQueue(getChildren)); requestsMap.put(Request.op2String(getChildren2), new PathStatsQueue(getChildren2)); requestsMap.put(Request.op2String(checkWatches), new PathStatsQueue(checkWatches)); requestsMap.put(Request.op2String(removeWatches), new PathStatsQueue(removeWatches)); requestsMap.put(Request.op2String(setWatches2), new PathStatsQueue(setWatches2)); requestsMap.put(Request.op2String(sync), new PathStatsQueue(sync)); this.immutableRequestsMap = java.util.Collections.unmodifiableMap(requestsMap); } static boolean isWriteOp(int requestType) { switch (requestType) { case ZooDefs.OpCode.sync: case ZooDefs.OpCode.create: case ZooDefs.OpCode.create2: case ZooDefs.OpCode.createContainer: case ZooDefs.OpCode.delete: case ZooDefs.OpCode.deleteContainer: case ZooDefs.OpCode.setData: case ZooDefs.OpCode.reconfig: case ZooDefs.OpCode.setACL: case ZooDefs.OpCode.multi: case ZooDefs.OpCode.check: return true; } return false; } static String trimPathDepth(String path, int maxDepth) { int count = 0; StringBuilder sb = new StringBuilder(); StringTokenizer pathTokenizer = new StringTokenizer(path, PATH_SEPERATOR); while (pathTokenizer.hasMoreElements() && count++ < maxDepth) { sb.append(PATH_SEPERATOR); sb.append(pathTokenizer.nextToken()); } path = sb.toString(); return path; } public void shutdown() { if (!enabled) { return; } LOG.info("shutdown scheduledExecutor"); scheduledExecutor.shutdownNow(); } public void start() { if (!enabled) { return; } LOG.info("Start the RequestPath collector"); immutableRequestsMap.forEach((opType, pathStatsQueue) -> pathStatsQueue.start()); // Schedule to log the top used read/write paths every 5 mins scheduledExecutor.scheduleWithFixedDelay(() -> { LOG.info("%nHere are the top Read paths:"); logTopPaths(aggregatePaths(4, queue -> !queue.isWriteOperation()), entry -> LOG.info("{} : {}", entry.getKey(), entry.getValue())); LOG.info("%nHere are the top Write paths:"); logTopPaths(aggregatePaths(4, queue -> queue.isWriteOperation()), entry -> LOG.info("{} : {}", entry.getKey(), entry.getValue())); }, COLLECTOR_INITIAL_DELAY, COLLECTOR_DELAY, TimeUnit.MINUTES); } /** * The public interface of the buffer. FinalRequestHandler will call into this for * each request that has a path and this needs to be fast. we sample the path so that * we don't have to store too many paths in memory */ public void registerRequest(int type, String path) { if (!enabled) { return; } if (ThreadLocalRandom.current().nextFloat() <= REQUEST_PREPROCESS_SAMPLE_RATE) { PathStatsQueue pathStatsQueue = immutableRequestsMap.get(Request.op2String(type)); if (pathStatsQueue != null) { pathStatsQueue.registerRequest(path); } else { LOG.error("We should not handle {}", type); } } } public void dumpTopRequestPath(PrintWriter pwriter, String requestTypeName, int queryMaxDepth) { if (queryMaxDepth < 1) { return; } PathStatsQueue pathStatsQueue = immutableRequestsMap.get(requestTypeName); if (pathStatsQueue == null) { pwriter.println("Can not find path stats for type: " + requestTypeName); return; } else { pwriter.println("The top requests of type: " + requestTypeName); } Map<String, Integer> combinedMap; final int maxDepth = Math.min(queryMaxDepth, REQUEST_PREPROCESS_PATH_DEPTH); combinedMap = pathStatsQueue.collectStats(maxDepth); logTopPaths(combinedMap, entry -> pwriter.println(entry.getKey() + " : " + entry.getValue())); } public void dumpTopReadPaths(PrintWriter pwriter, int queryMaxDepth) { pwriter.println("The top read requests are"); dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> !queue.isWriteOperation); } public void dumpTopWritePaths(PrintWriter pwriter, int queryMaxDepth) { pwriter.println("The top write requests are"); dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> queue.isWriteOperation); } public void dumpTopPaths(PrintWriter pwriter, int queryMaxDepth) { pwriter.println("The top requests are"); dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> true); } /** * Combine all the path Stats Queue that matches the predicate together * and then write to the pwriter */ private void dumpTopAggregatedPaths(PrintWriter pwriter, int queryMaxDepth, final Predicate<PathStatsQueue> predicate) { if (!enabled) { return; } final Map<String, Integer> combinedMap = aggregatePaths(queryMaxDepth, predicate); logTopPaths(combinedMap, entry -> pwriter.println(entry.getKey() + " : " + entry.getValue())); } Map<String, Integer> aggregatePaths(int queryMaxDepth, Predicate<PathStatsQueue> predicate) { final Map<String, Integer> combinedMap = new HashMap<>(REQUEST_PREPROCESS_TOPPATH_MAX); final int maxDepth = Math.min(queryMaxDepth, REQUEST_PREPROCESS_PATH_DEPTH); immutableRequestsMap.values() .stream() .filter(predicate) .forEach(pathStatsQueue -> pathStatsQueue.collectStats(maxDepth).forEach( (path, count) -> combinedMap.put(path, combinedMap.getOrDefault(path, 0) + count))); return combinedMap; } void logTopPaths(Map<String, Integer> combinedMap, final Consumer<Map.Entry<String, Integer>> output) { combinedMap.entrySet() .stream() // sort by path count .sorted(Comparator.comparing(Map.Entry<String, Integer>::getValue).reversed()) .limit(REQUEST_PREPROCESS_TOPPATH_MAX).forEach(output); } class PathStatsQueue { private final String requestTypeName; private final AtomicReference<ConcurrentLinkedQueue<String>> currentSlot; private final LinkedBlockingQueue<Map<String, Integer>> requestPathStats; private final boolean isWriteOperation; public PathStatsQueue(int requestType) { this.requestTypeName = Request.op2String(requestType); this.isWriteOperation = isWriteOp(requestType); requestPathStats = new LinkedBlockingQueue<>(REQUEST_STATS_SLOT_CAPACITY); currentSlot = new AtomicReference<>(new ConcurrentLinkedQueue<>()); } /* * The only write entry into this class, need to be fast. * Just queue up the path to the current slot queue locking free. */ public void registerRequest(String path) { if (!enabled) { return; } currentSlot.get().offer(path); } ConcurrentLinkedQueue<String> getCurrentSlot() { return currentSlot.get(); } /** * Helper function to MR the paths in the queue to map with count * 1. cut each path up to max depth * 2. aggregate the paths based on its count * * @param tobeProcessedSlot queue of paths called * @return a map containing aggregated path in the queue */ Map<String, Integer> mapReducePaths(int maxDepth, Collection<String> tobeProcessedSlot) { Map<String, Integer> newSlot = new ConcurrentHashMap<>(); tobeProcessedSlot.stream().filter(path -> path != null).forEach((path) -> { path = trimPathDepth(path, maxDepth); newSlot.put(path, newSlot.getOrDefault(path, 0) + 1); }); return newSlot; } /** * The only read point of this class * * @return the aggregated path to count map */ public Map<String, Integer> collectStats(int maxDepth) { Map<String, Integer> combinedMap; // Take a snapshot of the current slot and convert it to map. // Set the initial size as 0 since we don't want it to padding nulls in the end. Map<String, Integer> snapShot = mapReducePaths( maxDepth, Arrays.asList(currentSlot.get().toArray(new String[0]))); // Starting from the snapshot and go through the queue to reduce them into one map // the iterator can run concurrently with write but we want to use a real lock in the test synchronized (accurateMode ? requestPathStats : new Object()) { combinedMap = requestPathStats.stream().reduce(snapShot, (firstMap, secondMap) -> { secondMap.forEach((key, value) -> { String trimmedPath = trimPathDepth(key, maxDepth); firstMap.put(trimmedPath, firstMap.getOrDefault(trimmedPath, 0) + value); }); return firstMap; }); } return combinedMap; } /** * Start to schedule the pre-processing of the current slot */ public void start() { if (!enabled) { return; } // Staggered start and then run every 15 seconds no matter what int delay = ThreadLocalRandom.current().nextInt(REQUEST_STATS_SLOT_DURATION); // We need to use fixed Delay as the fixed rate will start the next one right // after the previous one finishes if it runs overtime instead of overlapping it. scheduledExecutor.scheduleWithFixedDelay(() -> { // Generate new slot so new requests will go here. ConcurrentLinkedQueue<String> tobeProcessedSlot = currentSlot.getAndSet(new ConcurrentLinkedQueue<>()); try { // pre process the last slot and queue it up, only one thread scheduled modified // this but we can mess up the collect part so we put a lock in the test. Map<String, Integer> latestSlot = mapReducePaths(REQUEST_PREPROCESS_PATH_DEPTH, tobeProcessedSlot); synchronized (accurateMode ? requestPathStats : new Object()) { if (requestPathStats.remainingCapacity() <= 0) { requestPathStats.poll(); } if (!requestPathStats.offer(latestSlot)) { LOG.error("Failed to insert the new request path stats for {}", requestTypeName); } } } catch (Exception e) { LOG.error("Failed to insert the new request path stats for {} with exception {}", requestTypeName, e); } }, delay, REQUEST_STATS_SLOT_DURATION, TimeUnit.SECONDS); } boolean isWriteOperation() { return isWriteOperation; } } }
⏎ org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.7.0-fyi.zip File size: 871011 bytes Release date: 2021-05-17 Download
⇒ Apache ZooKeeper 3.7.0 Jute Source Code
⇐ Download Apache ZooKeeper 3.7.0 Source Package
2022-11-16, 24727👍, 0💬
Popular Posts:
xml-commons Resolver Source Code Files are provided in the source package file, xml-commons-resolver...
JDK 11 jdk.rmic.jmod is the JMOD file for JDK 11 RMI (Remote Method Invocation) Compiler Tool tool, ...
How to download and install JDK (Java Development Kit) 7? If you want to write Java applications, yo...
Apache Avalon began in 1999 as the Java Apache Server Framework and in late 2002 separated from the ...
jlGui is a music player for the Java platform. It is based on Java Sound 1.0 (i.e. JDK 1.3+). It sup...