Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Apache ZooKeeper Server Source Code
Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source package file, apache-zookeeper-3.8.0.tar.gz.
You can download apache-zookeeper-3.8.0.tar.gz as described in the previous tutorial and go to the "zookeeper-server" sub-folder to view Apache ZooKeeper Server Source Code files.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/server/watch/WatchManagerOptimized.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server.watch; import java.io.PrintWriter; import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.util.BitHashSet; import org.apache.zookeeper.server.util.BitMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Optimized in memory and time complexity, compared to WatchManager, both the * memory consumption and time complexity improved a lot, but it cannot * efficiently remove the watcher when the session or socket is closed, for * majority use case this is not a problem. * * Changed made compared to WatchManager: * * - Use HashSet and BitSet to store the watchers to find a balance between * memory usage and time complexity * - Use ReadWriteLock instead of synchronized to reduce lock retention * - Lazily clean up the closed watchers */ public class WatchManagerOptimized implements IWatchManager, IDeadWatcherListener { private static final Logger LOG = LoggerFactory.getLogger(WatchManagerOptimized.class); private final ConcurrentHashMap<String, BitHashSet> pathWatches = new ConcurrentHashMap<String, BitHashSet>(); // watcher to bit id mapping private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>(); // used to lazily remove the dead watchers private final WatcherCleaner watcherCleaner; private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); public WatchManagerOptimized() { watcherCleaner = new WatcherCleaner(this); watcherCleaner.start(); } @Override public boolean addWatch(String path, Watcher watcher) { boolean result = false; // Need readLock to exclusively lock with removeWatcher, otherwise we // may add a dead watch whose connection was just closed. // // Creating new watcher bit and adding it to the BitHashSet has it's // own lock to minimize the write lock scope addRemovePathRWLock.readLock().lock(); try { // avoid race condition of adding a on flying dead watcher if (isDeadWatcher(watcher)) { LOG.debug("Ignoring addWatch with closed cnxn"); } else { Integer bit = watcherBitIdMap.add(watcher); BitHashSet watchers = pathWatches.get(path); if (watchers == null) { watchers = new BitHashSet(); BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); // it's possible multiple thread might add to pathWatches // while we're holding read lock, so we need this check // here if (existingWatchers != null) { watchers = existingWatchers; } } result = watchers.add(bit); } } finally { addRemovePathRWLock.readLock().unlock(); } return result; } /** * Used in the OpCode.checkWatches, which is a read operation, since read * and write requests are exclusively processed, we don't need to hold * lock here. * * Different from addWatch this method doesn't mutate any state, so we don't * need to hold read lock to avoid dead watcher (cnxn closed) being added * to the watcher manager. * * It's possible that before we lazily clean up the dead watcher, this will * return true, but since the cnxn is closed, the response will dropped as * well, so it doesn't matter. */ @Override public boolean containsWatcher(String path, Watcher watcher) { BitHashSet watchers = pathWatches.get(path); return watchers != null && watchers.contains(watcherBitIdMap.getBit(watcher)); } @Override public boolean removeWatcher(String path, Watcher watcher) { // Hold write lock directly because removeWatcher request is more // likely to be invoked when the watcher is actually exist and // haven't fired yet, so instead of having read lock to check existence // before switching to write one, it's actually cheaper to hold write // lock directly here. addRemovePathRWLock.writeLock().lock(); try { BitHashSet list = pathWatches.get(path); if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) { return false; } if (list.isEmpty()) { pathWatches.remove(path); } return true; } finally { addRemovePathRWLock.writeLock().unlock(); } } @Override public void removeWatcher(Watcher watcher) { Integer watcherBit; // Use exclusive lock with addWatcher to guarantee that we won't add // watch for a cnxn which is already closed. addRemovePathRWLock.writeLock().lock(); try { // do nothing if the watcher is not tracked watcherBit = watcherBitIdMap.getBit(watcher); if (watcherBit == null) { return; } } finally { addRemovePathRWLock.writeLock().unlock(); } // We can guarantee that when this line is executed, the cnxn of this // watcher has already been marked as stale (this method is only called // from ServerCnxn.close after we set stale), which means no watches // will be added to the watcher manager with this watcher, so that we // can safely clean up this dead watcher. // // So it's not necessary to have this line in the addRemovePathRWLock. // And moving the addDeadWatcher out of the locking block to avoid // holding the write lock while we're blocked on adding dead watchers // into the watcherCleaner. watcherCleaner.addDeadWatcher(watcherBit); } /** * Entry for WatcherCleaner to remove dead watchers * * @param deadWatchers the watchers need to be removed */ @Override public void processDeadWatchers(Set<Integer> deadWatchers) { // All the watchers being processed here are guaranteed to be dead, // no watches will be added for those dead watchers, that's why I // don't need to have addRemovePathRWLock here. BitSet bits = new BitSet(); for (int dw : deadWatchers) { bits.set(dw); } // The value iterator will reflect the state when it was // created, don't need to synchronize. for (BitHashSet watchers : pathWatches.values()) { watchers.remove(deadWatchers, bits); } // Better to remove the empty path from pathWatches, but it will add // lot of lock contention and affect the throughput of addWatch, // let's rely on the triggerWatch to delete it. for (Integer wbit : deadWatchers) { watcherBitIdMap.remove(wbit); } } @Override public WatcherOrBitSet triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } @Override public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); BitHashSet watchers = remove(path); if (watchers == null) { return null; } int triggeredWatches = 0; // Avoid race condition between dead watcher cleaner in // WatcherCleaner and iterating here synchronized (watchers) { for (Integer wBit : watchers) { if (suppress != null && suppress.contains(wBit)) { continue; } Watcher w = watcherBitIdMap.get(wBit); // skip dead watcher if (w == null || isDeadWatcher(w)) { continue; } w.process(e); triggeredWatches++; } } updateMetrics(type, triggeredWatches); return new WatcherOrBitSet(watchers); } @Override public int size() { int size = 0; for (BitHashSet watches : pathWatches.values()) { size += watches.size(); } return size; } @Override public void shutdown() { if (watcherCleaner != null) { watcherCleaner.shutdown(); } } private BitHashSet remove(String path) { addRemovePathRWLock.writeLock().lock(); try { return pathWatches.remove(path); } finally { addRemovePathRWLock.writeLock().unlock(); } } void updateMetrics(final EventType type, int size) { switch (type) { case NodeCreated: ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(size); break; case NodeDeleted: ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(size); break; case NodeDataChanged: ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(size); break; case NodeChildrenChanged: ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(size); break; default: // Other types not logged. break; } } boolean isDeadWatcher(Watcher watcher) { return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale(); } int pathSize() { return pathWatches.size(); } @Override public WatchesSummary getWatchesSummary() { return new WatchesSummary(watcherBitIdMap.size(), pathSize(), size()); } @Override public WatchesReport getWatches() { Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>(); for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) { Long id = ((ServerCnxn) e.getKey()).getSessionId(); Set<String> paths = new HashSet<String>(e.getValue()); id2paths.put(id, paths); } return new WatchesReport(id2paths); } /** * Iterate through ConcurrentHashMap is 'safe', it will reflect the state * of the map at the time iteration began, may miss update while iterating, * given this is used in the commands to get a general idea of the watches * state, we don't care about missing some update. */ @Override public WatchesPathReport getWatchesByPath() { Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>(); for (Entry<String, BitHashSet> e : pathWatches.entrySet()) { BitHashSet watchers = e.getValue(); synchronized (watchers) { Set<Long> ids = new HashSet<Long>(watchers.size()); path2ids.put(e.getKey(), ids); for (Integer wbit : watchers) { Watcher watcher = watcherBitIdMap.get(wbit); if (watcher instanceof ServerCnxn) { ids.add(((ServerCnxn) watcher).getSessionId()); } } } } return new WatchesPathReport(path2ids); } /** * May cause OOM if there are lots of watches, might better to forbid * it in this class. */ public Map<Watcher, Set<String>> getWatcher2PathesMap() { Map<Watcher, Set<String>> watcher2paths = new HashMap<Watcher, Set<String>>(); for (Entry<String, BitHashSet> e : pathWatches.entrySet()) { String path = e.getKey(); BitHashSet watchers = e.getValue(); // avoid race condition with add/remove synchronized (watchers) { for (Integer wbit : watchers) { Watcher w = watcherBitIdMap.get(wbit); if (w == null) { continue; } if (!watcher2paths.containsKey(w)) { watcher2paths.put(w, new HashSet<String>()); } watcher2paths.get(w).add(path); } } } return watcher2paths; } @Override public void dumpWatches(PrintWriter pwriter, boolean byPath) { if (byPath) { for (Entry<String, BitHashSet> e : pathWatches.entrySet()) { pwriter.println(e.getKey()); BitHashSet watchers = e.getValue(); synchronized (watchers) { for (Integer wbit : watchers) { Watcher w = watcherBitIdMap.get(wbit); if (!(w instanceof ServerCnxn)) { continue; } pwriter.print("\t0x"); pwriter.print(Long.toHexString(((ServerCnxn) w).getSessionId())); pwriter.print("\n"); } } } } else { for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) { pwriter.print("0x"); pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId())); for (String path : e.getValue()) { pwriter.print("\t"); pwriter.println(path); } } } } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(watcherBitIdMap.size()).append(" connections watching ").append(pathSize()).append(" paths\n"); sb.append("Total watches:").append(size()); return sb.toString(); } }
⏎ org/apache/zookeeper/server/watch/WatchManagerOptimized.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.8.0-fyi.zip File size: 885581 bytes Release date: 2022-02-25 Download
⇒ Apache ZooKeeper Jute Source Code
⇐ Download and Install Apache ZooKeeper Source Package
2022-11-16, 13019👍, 0💬
Popular Posts:
JRE 8 rt.jar is the JAR file for JRE 8 RT (Runtime) libraries. JRE (Java Runtime) 8 is the runtime e...
iText is an ideal library for developers looking to enhance web- and other applications with dynamic...
How to download and install mysql-connector-j-8.0.31 .zip?Connector/J Java library is a JDBC Driver ...
What Is poi-contrib-3.5.jar? poi-contrib-3.5.jar is one of the JAR files for Apache POI 3.5, which p...
The Java Naming and Directory Interface (JNDI) is part of the Java platform, providing applications ...