Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Apache ZooKeeper 3.7.0 Server Source Code
Apache ZooKeeper is an open-source server which enables highly
reliable distributed coordination.
Apache ZooKeeper Server Source Code files are provided in the source packge (apache-zookeeper-3.7.0.tar.gz). You can download it at Apache ZooKeeper Website.
You can also browse Apache ZooKeeper Server Source Code below:
✍: FYIcenter.com
⏎ org/apache/zookeeper/server/persistence/FileTxnLog.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server.persistence; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.zip.Adler32; import java.util.zip.Checksum; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ServerStats; import org.apache.zookeeper.server.TxnLogEntry; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class implements the TxnLog interface. It provides api's * to access the txnlogs and add entries to it. * <p> * The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote> */ public class FileTxnLog implements TxnLog, Closeable { private static final Logger LOG; public static final int TXNLOG_MAGIC = ByteBuffer.wrap("ZKLG".getBytes()).getInt(); public static final int VERSION = 2; public static final String LOG_FILE_PREFIX = "log"; static final String FSYNC_WARNING_THRESHOLD_MS_PROPERTY = "fsync.warningthresholdms"; static final String ZOOKEEPER_FSYNC_WARNING_THRESHOLD_MS_PROPERTY = "zookeeper." + FSYNC_WARNING_THRESHOLD_MS_PROPERTY; /** Maximum time we allow for elapsed fsync before WARNing */ private static final long fsyncWarningThresholdMS; /** * This parameter limit the size of each txnlog to a given limit (KB). * It does not affect how often the system will take a snapshot [zookeeper.snapCount] * We roll the txnlog when either of the two limits are reached. * Also since we only roll the logs at transaction boundaries, actual file size can exceed * this limit by the maximum size of a serialized transaction. * The feature is disabled by default (-1) */ private static final String txnLogSizeLimitSetting = "zookeeper.txnLogSizeLimitInKb"; /** * The actual txnlog size limit in bytes. */ private static long txnLogSizeLimit = -1; static { LOG = LoggerFactory.getLogger(FileTxnLog.class); /** Local variable to read fsync.warningthresholdms into */ Long fsyncWarningThreshold; if ((fsyncWarningThreshold = Long.getLong(ZOOKEEPER_FSYNC_WARNING_THRESHOLD_MS_PROPERTY)) == null) { fsyncWarningThreshold = Long.getLong(FSYNC_WARNING_THRESHOLD_MS_PROPERTY, 1000); } fsyncWarningThresholdMS = fsyncWarningThreshold; Long logSize = Long.getLong(txnLogSizeLimitSetting, -1); if (logSize > 0) { LOG.info("{} = {}", txnLogSizeLimitSetting, logSize); // Convert to bytes logSize = logSize * 1024; txnLogSizeLimit = logSize; } } long lastZxidSeen; volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; File logDir; private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no"); long dbId; private final Queue<FileOutputStream> streamsToFlush = new ArrayDeque<>(); File logFileWrite = null; private FilePadding filePadding = new FilePadding(); private ServerStats serverStats; private volatile long syncElapsedMS = -1L; /** * A running total of all complete log files * This does not include the current file being written to */ private long prevLogsRunningTotal; /** * constructor for FileTxnLog. Take the directory * where the txnlogs are stored * @param logDir the directory where the txnlogs are stored */ public FileTxnLog(File logDir) { this.logDir = logDir; } /** * method to allow setting preallocate size * of log file to pad the file. * @param size the size to set to in bytes */ public static void setPreallocSize(long size) { FilePadding.setPreallocSize(size); } /** * Setter for ServerStats to monitor fsync threshold exceed * @param serverStats used to update fsyncThresholdExceedCount */ @Override public synchronized void setServerStats(ServerStats serverStats) { this.serverStats = serverStats; } /** * Set log size limit */ public static void setTxnLogSizeLimit(long size) { txnLogSizeLimit = size; } /** * Return the current on-disk size of log size. This will be accurate only * after commit() is called. Otherwise, unflushed txns may not be included. */ public synchronized long getCurrentLogSize() { if (logFileWrite != null) { return logFileWrite.length(); } return 0; } public synchronized void setTotalLogSize(long size) { prevLogsRunningTotal = size; } public synchronized long getTotalLogSize() { return prevLogsRunningTotal + getCurrentLogSize(); } /** * creates a checksum algorithm to be used * @return the checksum used for this txnlog */ protected Checksum makeChecksumAlgorithm() { return new Adler32(); } /** * rollover the current log file to a new one. * @throws IOException */ public synchronized void rollLog() throws IOException { if (logStream != null) { this.logStream.flush(); prevLogsRunningTotal += getCurrentLogSize(); this.logStream = null; oa = null; // Roll over the current log file into the running total } } /** * close all the open file handles * @throws IOException */ public synchronized void close() throws IOException { if (logStream != null) { logStream.close(); } for (FileOutputStream log : streamsToFlush) { log.close(); } } /** * append an entry to the transaction log * @param hdr the header of the transaction * @param txn the transaction part of the entry * returns true iff something appended, otw false */ public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { return append(hdr, txn, null); } @Override public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException { if (hdr == null) { return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn( "Current zxid {} is <= {} for {}", hdr.getZxid(), lastZxidSeen, hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } if (logStream == null) { LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid())); logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream = new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); byte[] buf = Util.marshallTxnEntry(hdr, txn, digest); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf); return true; } /** * Find the log file that starts at, or just before, the snapshot. Return * this and all subsequent logs. Results are ordered by zxid of file, * ascending order. * @param logDirList array of files * @param snapshotZxid return files at, or before this zxid * @return log files that starts at, or just before, the snapshot and subsequent ones */ public static File[] getLogFiles(File[] logDirList, long snapshotZxid) { List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true); long logZxid = 0; // Find the log file that starts before or at the same time as the // zxid of the snapshot for (File f : files) { long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX); if (fzxid > snapshotZxid) { break; } // the files // are sorted with zxid's if (fzxid > logZxid) { logZxid = fzxid; } } List<File> v = new ArrayList<File>(5); for (File f : files) { long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX); if (fzxid < logZxid) { continue; } v.add(f); } return v.toArray(new File[0]); } /** * get the last zxid that was logged in the transaction logs * @return the last zxid logged in the transaction logs */ public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); long maxLog = files.length > 0 ? Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1; // if a log file is more recent we must scan it to find // the highest zxid long zxid = maxLog; try (FileTxnLog txn = new FileTxnLog(logDir); TxnIterator itr = txn.read(maxLog)) { while (true) { if (!itr.next()) { break; } TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } } catch (IOException e) { LOG.warn("Unexpected exception", e); } return zxid; } /** * commit the logs. make sure that everything hits the * disk */ public synchronized void commit() throws IOException { if (logStream != null) { logStream.flush(); } for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); FileChannel channel = log.getChannel(); channel.force(false); syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { if (serverStats != null) { serverStats.incrementFsyncThresholdExceedCount(); } LOG.warn( "fsync-ing the write ahead log in {} took {}ms which will adversely effect operation latency." + "File size is {} bytes. See the ZooKeeper troubleshooting guide", Thread.currentThread().getName(), syncElapsedMS, channel.size()); } ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS); } } while (streamsToFlush.size() > 1) { streamsToFlush.poll().close(); } // Roll the log file if we exceed the size limit if (txnLogSizeLimit > 0) { long logSize = getCurrentLogSize(); if (logSize > txnLogSizeLimit) { LOG.debug("Log size limit reached: {}", logSize); rollLog(); } } } /** * * @return elapsed sync time of transaction log in milliseconds */ public long getTxnLogSyncElapsedTime() { return syncElapsedMS; } /** * start reading all the transactions from the given zxid * @param zxid the zxid to start reading transactions from * @return returns an iterator to iterate through the transaction * logs */ public TxnIterator read(long zxid) throws IOException { return read(zxid, true); } /** * start reading all the transactions from the given zxid. * * @param zxid the zxid to start reading transactions from * @param fastForward true if the iterator should be fast forwarded to point * to the txn of a given zxid, else the iterator will point to the * starting txn of a txnlog that may contain txn of a given zxid * @return returns an iterator to iterate through the transaction logs */ public TxnIterator read(long zxid, boolean fastForward) throws IOException { return new FileTxnIterator(logDir, zxid, fastForward); } /** * truncate the current transaction logs * @param zxid the zxid to truncate the logs to * @return true if successful false if not */ public boolean truncate(long zxid) throws IOException { try (FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid)) { PositionInputStream input = itr.inputStream; if (input == null) { throw new IOException("No log files found to truncate! This could " + "happen if you still have snapshots from an old setup or " + "log files were deleted accidentally or dataLogDir was changed in zoo.cfg."); } long pos = input.getPosition(); // now, truncate at the current position RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw"); raf.setLength(pos); raf.close(); while (itr.goToNextLog()) { if (!itr.logFile.delete()) { LOG.warn("Unable to truncate {}", itr.logFile); } } } return true; } /** * read the header of the transaction file * @param file the transaction file to read * @return header that was read from the file * @throws IOException */ private static FileHeader readHeader(File file) throws IOException { InputStream is = null; try { is = new BufferedInputStream(new FileInputStream(file)); InputArchive ia = BinaryInputArchive.getArchive(is); FileHeader hdr = new FileHeader(); hdr.deserialize(ia, "fileheader"); return hdr; } finally { try { if (is != null) { is.close(); } } catch (IOException e) { LOG.warn("Ignoring exception during close", e); } } } /** * the dbid of this transaction database * @return the dbid of this database */ public long getDbId() throws IOException { FileTxnIterator itr = new FileTxnIterator(logDir, 0); FileHeader fh = readHeader(itr.logFile); itr.close(); if (fh == null) { throw new IOException("Unsupported Format."); } return fh.getDbid(); } /** * the forceSync value. true if forceSync is enabled, false otherwise. * @return the forceSync value */ public boolean isForceSync() { return forceSync; } /** * a class that keeps track of the position * in the input stream. The position points to offset * that has been consumed by the applications. It can * wrap buffered input streams to provide the right offset * for the application. */ static class PositionInputStream extends FilterInputStream { long position; protected PositionInputStream(InputStream in) { super(in); position = 0; } @Override public int read() throws IOException { int rc = super.read(); if (rc > -1) { position++; } return rc; } public int read(byte[] b) throws IOException { int rc = super.read(b); if (rc > 0) { position += rc; } return rc; } @Override public int read(byte[] b, int off, int len) throws IOException { int rc = super.read(b, off, len); if (rc > 0) { position += rc; } return rc; } @Override public long skip(long n) throws IOException { long rc = super.skip(n); if (rc > 0) { position += rc; } return rc; } public long getPosition() { return position; } @Override public boolean markSupported() { return false; } @Override public void mark(int readLimit) { throw new UnsupportedOperationException("mark"); } @Override public void reset() { throw new UnsupportedOperationException("reset"); } } /** * this class implements the txnlog iterator interface * which is used for reading the transaction logs */ public static class FileTxnIterator implements TxnLog.TxnIterator { File logDir; long zxid; TxnHeader hdr; Record record; TxnDigest digest; File logFile; InputArchive ia; static final String CRC_ERROR = "CRC check failed"; PositionInputStream inputStream = null; //stored files is the list of files greater than //the zxid we are looking for. private ArrayList<File> storedFiles; /** * create an iterator over a transaction database directory * @param logDir the transaction database directory * @param zxid the zxid to start reading from * @param fastForward true if the iterator should be fast forwarded to * point to the txn of a given zxid, else the iterator will * point to the starting txn of a txnlog that may contain txn of * a given zxid * @throws IOException */ public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException { this.logDir = logDir; this.zxid = zxid; init(); if (fastForward && hdr != null) { while (hdr.getZxid() < zxid) { if (!next()) { break; } } } } /** * create an iterator over a transaction database directory * @param logDir the transaction database directory * @param zxid the zxid to start reading from * @throws IOException */ public FileTxnIterator(File logDir, long zxid) throws IOException { this(logDir, zxid, true); } /** * initialize to the zxid specified * this is inclusive of the zxid * @throws IOException */ void init() throws IOException { storedFiles = new ArrayList<>(); List<File> files = Util.sortDataDir( FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false); for (File f : files) { if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) { storedFiles.add(f); } else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) { // add the last logfile that is less than the zxid storedFiles.add(f); break; } } goToNextLog(); next(); } /** * Return total storage size of txnlog that will return by this iterator. */ public long getStorageSize() { long sum = 0; for (File f : storedFiles) { sum += f.length(); } return sum; } /** * go to the next logfile * @return true if there is one and false if there is no * new file to be read * @throws IOException */ private boolean goToNextLog() throws IOException { if (storedFiles.size() > 0) { this.logFile = storedFiles.remove(storedFiles.size() - 1); ia = createInputArchive(this.logFile); return true; } return false; } /** * read the header from the inputarchive * @param ia the inputarchive to be read from * @param is the inputstream * @throws IOException */ protected void inStreamCreated(InputArchive ia, InputStream is) throws IOException { FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) { throw new IOException("Transaction log: " + this.logFile + " has invalid magic number " + header.getMagic() + " != " + FileTxnLog.TXNLOG_MAGIC); } } /** * Invoked to indicate that the input stream has been created. * @param logFile the file to read. * @throws IOException **/ protected InputArchive createInputArchive(File logFile) throws IOException { if (inputStream == null) { inputStream = new PositionInputStream(new BufferedInputStream(new FileInputStream(logFile))); LOG.debug("Created new input stream: {}", logFile); ia = BinaryInputArchive.getArchive(inputStream); inStreamCreated(ia, inputStream); LOG.debug("Created new input archive: {}", logFile); } return ia; } /** * create a checksum algorithm * @return the checksum algorithm */ protected Checksum makeChecksumAlgorithm() { return new Adler32(); } /** * the iterator that moves to the next transaction * @return true if there is more transactions to be read * false if not. */ public boolean next() throws IOException { if (ia == null) { return false; } try { long crcValue = ia.readLong("crcvalue"); byte[] bytes = Util.readTxnBytes(ia); // Since we preallocate, we define EOF to be an if (bytes == null || bytes.length == 0) { throw new EOFException("Failed to read " + logFile); } // EOF or corrupted record // validate CRC Checksum crc = makeChecksumAlgorithm(); crc.update(bytes, 0, bytes.length); if (crcValue != crc.getValue()) { throw new IOException(CRC_ERROR); } TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes); hdr = logEntry.getHeader(); record = logEntry.getTxn(); digest = logEntry.getDigest(); } catch (EOFException e) { LOG.debug("EOF exception", e); inputStream.close(); inputStream = null; ia = null; hdr = null; // this means that the file has ended // we should go to the next file if (!goToNextLog()) { return false; } // if we went to the next log file, we should call next() again return next(); } catch (IOException e) { inputStream.close(); throw e; } return true; } /** * return the current header * @return the current header that * is read */ public TxnHeader getHeader() { return hdr; } /** * return the current transaction * @return the current transaction * that is read */ public Record getTxn() { return record; } public TxnDigest getDigest() { return digest; } /** * close the iterator * and release the resources. */ public void close() throws IOException { if (inputStream != null) { inputStream.close(); } } } }
⏎ org/apache/zookeeper/server/persistence/FileTxnLog.java
Or download all of them as a single archive file:
File name: zookeeper-server-3.7.0-fyi.zip File size: 871011 bytes Release date: 2021-05-17 Download
⇒ Apache ZooKeeper 3.7.0 Jute Source Code
⇐ Download Apache ZooKeeper 3.7.0 Source Package
2022-11-16, 24726👍, 0💬
Popular Posts:
How to download and install iText7-Core-7.1.4.zip? iText7-Core-7.1.4.zip is the binary package of iT...
Jackson is "the Java JSON library" or "the best JSON parser for Java". Or simply as "JSON for Java"....
JDK 11 jdk.dynalink.jmod is the JMOD file for JDK 11 Dynamic Linking module. JDK 11 Dynamic Linking ...
What is the sax\Writer.java provided in the Apache Xerces package? I have Apache Xerces 2.11.0 insta...
maven-core-3.8.6.jar is the JAR file for Apache Maven 3.8.6 Core module. Apache Maven is a software ...