Categories:
Audio (13)
Biotech (29)
Bytecode (36)
Database (77)
Framework (7)
Game (7)
General (507)
Graphics (53)
I/O (35)
IDE (2)
JAR Tools (101)
JavaBeans (21)
JDBC (121)
JDK (426)
JSP (20)
Logging (108)
Mail (58)
Messaging (8)
Network (84)
PDF (97)
Report (7)
Scripting (84)
Security (32)
Server (121)
Servlet (26)
SOAP (24)
Testing (54)
Web (15)
XML (309)
Collections:
Other Resources:
Source Code for Apache Log4j Flume Appender
Apache Log4j Flume Appender allows applications to send events to Flume Agents.
Bytecode (Java 8) for Apache Log4j Flume Appender is provided in a separate JAR file like log4j-flume-ng-2.14.1.jar.
Source Code files for Apache Log4j IOStreams are provided in both binary packge like apache-log4j-2.14.1-bin.zip and source package like apache-log4j-2.14.1-src.zip. You can download them at Apache Log4j Website.
You can also browse Source Code files for Apache Log4j Flume Appender 2.14.1 below.
✍: FYIcenter.com
⏎ org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache license, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the license for the specific language governing permissions and * limitations under the license. */ package org.apache.logging.log4j.flume.appender; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.flume.Event; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.logging.log4j.core.appender.AppenderLoggingException; import org.apache.logging.log4j.core.appender.ManagerFactory; /** * Manager for FlumeAvroAppenders. */ public class FlumeAvroManager extends AbstractFlumeManager { private static final int MAX_RECONNECTS = 3; private static final int MINIMUM_TIMEOUT = 1000; private static AvroManagerFactory factory = new AvroManagerFactory(); private final Agent[] agents; private final int batchSize; private final long delayNanos; private final int delayMillis; private final int retries; private final int connectTimeoutMillis; private final int requestTimeoutMillis; private final int current = 0; private volatile RpcClient rpcClient; private BatchEvent batchEvent = new BatchEvent(); private long nextSend = 0; /** * Constructor * @param name The unique name of this manager. * @param agents An array of Agents. * @param batchSize The number of events to include in a batch. * @param retries The number of times to retry connecting before giving up. * @param connectTimeout The connection timeout in ms. * @param requestTimeout The request timeout in ms. * */ protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) { super(name); this.agents = agents; this.batchSize = batchSize; this.delayMillis = delayMillis; this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis); this.retries = retries; this.connectTimeoutMillis = connectTimeout; this.requestTimeoutMillis = requestTimeout; this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); } /** * Returns a FlumeAvroManager. * @param name The name of the manager. * @param agents The agents to use. * @param batchSize The number of events to include in a batch. * @param delayMillis The number of milliseconds to wait before sending an incomplete batch. * @param retries The number of times to retry connecting before giving up. * @param connectTimeoutMillis The connection timeout in ms. * @param requestTimeoutMillis The request timeout in ms. * @return A FlumeAvroManager. */ public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis, final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { if (agents == null || agents.length == 0) { throw new IllegalArgumentException("At least one agent is required"); } if (batchSize <= 0) { batchSize = 1; } final StringBuilder sb = new StringBuilder(name); sb.append(" FlumeAvro["); boolean first = true; for (final Agent agent : agents) { if (!first) { sb.append(','); } sb.append(agent.getHost()).append(':').append(agent.getPort()); first = false; } sb.append(']'); return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis)); } /** * Returns the agents. * @return The agent array. */ public Agent[] getAgents() { return agents; } /** * Returns the index of the current agent. * @return The index for the current agent. */ public int getCurrent() { return current; } public int getRetries() { return retries; } public int getConnectTimeoutMillis() { return connectTimeoutMillis; } public int getRequestTimeoutMillis() { return requestTimeoutMillis; } public int getBatchSize() { return batchSize; } public int getDelayMillis() { return delayMillis; } public void send(final BatchEvent events) { if (rpcClient == null) { synchronized (this) { if (rpcClient == null) { rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); } } } if (rpcClient != null) { try { LOGGER.trace("Sending batch of {} events", events.getEvents().size()); rpcClient.appendBatch(events.getEvents()); } catch (final Exception ex) { rpcClient.close(); rpcClient = null; final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + agents[current].getPort(); LOGGER.warn(msg, ex); throw new AppenderLoggingException("No Flume agents are available"); } } else { final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + agents[current].getPort(); LOGGER.warn(msg); throw new AppenderLoggingException("No Flume agents are available"); } } @Override public void send(final Event event) { if (batchSize == 1) { if (rpcClient == null) { rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); } if (rpcClient != null) { try { rpcClient.append(event); } catch (final Exception ex) { rpcClient.close(); rpcClient = null; final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + agents[current].getPort(); LOGGER.warn(msg, ex); throw new AppenderLoggingException("No Flume agents are available"); } } else { final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + agents[current].getPort(); LOGGER.warn(msg); throw new AppenderLoggingException("No Flume agents are available"); } } else { int eventCount; BatchEvent batch = null; synchronized(batchEvent) { batchEvent.addEvent(event); eventCount = batchEvent.size(); long now = System.nanoTime(); if (eventCount == 1) { nextSend = now + delayNanos; } if (eventCount >= batchSize || now >= nextSend) { batch = batchEvent; batchEvent = new BatchEvent(); } } if (batch != null) { send(batch); } } } /** * There is a very good chance that this will always return the first agent even if it isn't available. * @param agents The list of agents to choose from * @return The FlumeEventAvroServer. */ private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { try { final Properties props = new Properties(); props.put("client.type", "default_failover"); int agentCount = 1; final StringBuilder sb = new StringBuilder(); for (final Agent agent : agents) { if (sb.length() > 0) { sb.append(' '); } final String hostName = "host" + agentCount++; props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort()); sb.append(hostName); } props.put("hosts", sb.toString()); if (batchSize > 0) { props.put("batch-size", Integer.toString(batchSize)); } if (retries > 1) { if (retries > MAX_RECONNECTS) { retries = MAX_RECONNECTS; } props.put("max-attempts", Integer.toString(retries * agents.length)); } if (requestTimeoutMillis >= MINIMUM_TIMEOUT) { props.put("request-timeout", Integer.toString(requestTimeoutMillis)); } if (connectTimeoutMillis >= MINIMUM_TIMEOUT) { props.put("connect-timeout", Integer.toString(connectTimeoutMillis)); } return RpcClientFactory.getInstance(props); } catch (final Exception ex) { LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); return null; } } @Override protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { boolean closed = true; if (rpcClient != null) { try { synchronized(this) { try { if (batchSize > 1 && batchEvent.getEvents().size() > 0) { send(batchEvent); } } catch (final Exception ex) { LOGGER.error("Error sending final batch: {}", ex.getMessage()); closed = false; } } rpcClient.close(); } catch (final Exception ex) { LOGGER.error("Attempt to close RPC client failed", ex); closed = false; } } rpcClient = null; return closed; } /** * Factory data. */ private static class FactoryData { private final String name; private final Agent[] agents; private final int batchSize; private final int delayMillis; private final int retries; private final int conntectTimeoutMillis; private final int requestTimeoutMillis; /** * Constructor. * @param name The name of the Appender. * @param agents The agents. * @param batchSize The number of events to include in a batch. */ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis, final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { this.name = name; this.agents = agents; this.batchSize = batchSize; this.delayMillis = delayMillis; this.retries = retries; this.conntectTimeoutMillis = connectTimeoutMillis; this.requestTimeoutMillis = requestTimeoutMillis; } } /** * Avro Manager Factory. */ private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { /** * Create the FlumeAvroManager. * @param name The name of the entity to manage. * @param data The data required to create the entity. * @return The FlumeAvroManager. */ @Override public FlumeAvroManager createManager(final String name, final FactoryData data) { try { return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis, data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis); } catch (final Exception ex) { LOGGER.error("Could not create FlumeAvroManager", ex); } return null; } } }
⏎ org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
Or download all of them as a single archive file:
File name: log4j-flume-ng-2.14.1-sources.jar File size: 37127 bytes Release date: 2021-03-06 Download
⇒ Source Code for Apache Log4j 1.2 Bridge
⇐ Source Code for Apache Log4j Tag Library
2015-11-04, 6855👍, 0💬
Popular Posts:
How to download and install JDK (Java Development Kit) 6? If you want to write Java applications, yo...
JDK 11 jdk.jlink.jmod is the JMOD file for JDK 11 JLink tool, which can be invoked by the "jlink" co...
commons-lang-2.6.jar is the JAR file for Apache Commons Lang 2.6, which provides a host of helper ut...
How to download and install Apache XMLBeans-2.6.0.zip? If you want to try the XMLBeans Java library,...
JDK 17 java.xml.jmod is the JMOD file for JDK 17 XML (eXtensible Markup Language) module. JDK 17 XML...