Source Code for Apache Log4j Flume Appender

Apache Log4j Flume Appender allows applications to send events to Flume Agents.

Bytecode (Java 8) for Apache Log4j Flume Appender is provided in a separate JAR file like log4j-flume-ng-2.14.1.jar.

Source Code files for Apache Log4j IOStreams are provided in both binary packge like and source package like You can download them at Apache Log4j Website.

You can also browse Source Code files for Apache Log4j Flume Appender 2.14.1 below.



 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache license, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the license for the specific language governing permissions and
 * limitations under the license.
package org.apache.logging.log4j.flume.appender;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.agent.embedded.EmbeddedAgent;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.config.ConfigurationException;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.util.NameUtil;
import org.apache.logging.log4j.util.PropertiesUtil;
import org.apache.logging.log4j.util.Strings;

public class FlumeEmbeddedManager extends AbstractFlumeManager {

    private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");

    private static final String IN_MEMORY = "InMemory";

    private static FlumeManagerFactory factory = new FlumeManagerFactory();

    private final EmbeddedAgent agent;

    private final String shortName;

     * Constructor
     * @param name The unique name of this manager.
     * @param shortName The short version of the agent name.
     * @param agent The embedded agent.
    protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
        this.agent = agent;
        this.shortName = shortName;

     * Returns a FlumeEmbeddedManager.
     * @param name The name of the manager.
     * @param agents The agents to use.
     * @param properties Properties for the embedded manager.
     * @param batchSize The number of events to include in a batch.
     * @param dataDir The directory where the Flume FileChannel should write to.
     * @return A FlumeAvroManager.
    public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
                                                  int batchSize, final String dataDir) {

        if (batchSize <= 0) {
            batchSize = 1;

        if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
            throw new IllegalArgumentException("Either an Agent or properties are required");
        } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
            throw new IllegalArgumentException("Cannot configure both Agents and Properties.");

        final StringBuilder sb = new StringBuilder();
        boolean first = true;

        if (agents != null && agents.length > 0) {
            for (final Agent agent : agents) {
                if (!first) {
                first = false;
        } else {
            String sep = Strings.EMPTY;
            final StringBuilder props = new StringBuilder();
            for (final Property prop : properties) {
                sep = "_";
        return getManager(sb.toString(), factory,
                new FactoryData(name, agents, properties, batchSize, dataDir));

    public void send(final Event event) {
        try {
        } catch (final EventDeliveryException ex) {
            throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);

    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
        return true;

     * Factory data.
    private static class FactoryData {
        private final Agent[] agents;
        private final Property[] properties;
        private final int batchSize;
        private final String dataDir;
        private final String name;

         * Constructor.
         * @param name The name of the Appender.
         * @param agents The agents.
         * @param properties The Flume configuration properties.
         * @param batchSize The number of events to include in a batch.
         * @param dataDir The directory where Flume should write to.
        public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
                           final String dataDir) {
   = name;
            this.agents = agents;
            this.batchSize = batchSize;
   = properties;
            this.dataDir = dataDir;

     * Avro Manager Factory.
    private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {

         * Create the FlumeAvroManager.
         * @param name The name of the entity to manage.
         * @param data The data required to create the entity.
         * @return The FlumeAvroManager.
        public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
            try {
                final Map<String, String> props = createProperties(, data.agents,,
                    data.batchSize, data.dataDir);
                final EmbeddedAgent agent = new EmbeddedAgent(name);
                LOGGER.debug("Created Agent " + name);
                return new FlumeEmbeddedManager(name,, agent);
            } catch (final Exception ex) {
                LOGGER.error("Could not create FlumeEmbeddedManager", ex);
            return null;

        private Map<String, String> createProperties(final String name, final Agent[] agents,
                                                     final Property[] properties, final int batchSize, String dataDir) {
            final Map<String, String> props = new HashMap<>();

            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
                LOGGER.error("No Flume configuration provided");
                throw new ConfigurationException("No Flume configuration provided");

            if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
                LOGGER.error("Agents and Flume configuration cannot both be specified");
                throw new ConfigurationException("Agents and Flume configuration cannot both be specified");

            if (agents != null && agents.length > 0) {

                if (Strings.isNotEmpty(dataDir)) {
                    if (dataDir.equals(IN_MEMORY)) {
                        props.put("channel.type", "memory");
                    } else {
                        props.put("channel.type", "file");

                        if (!dataDir.endsWith(FILE_SEP)) {
                            dataDir = dataDir + FILE_SEP;

                        props.put("channel.checkpointDir", dataDir + "checkpoint");
                        props.put("channel.dataDirs", dataDir + "data");

                } else {
                    props.put("channel.type", "file");

                final StringBuilder sb = new StringBuilder();
                String leading = Strings.EMPTY;
                final int priority = agents.length;
                for (int i = 0; i < priority; ++i) {
                    leading = " ";
                    final String prefix = "agent" + i;
                    props.put(prefix + ".type", "avro");
                    props.put(prefix + ".hostname", agents[i].getHost());
                    props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
                    props.put(prefix + ".batch-size", Integer.toString(batchSize));
                    props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
                props.put("sinks", sb.toString());
                props.put("processor.type", "failover");
            } else {
                String[] sinks = null;

                for (final Property property : properties) {
                    final String key = property.getName();

                    if (Strings.isEmpty(key)) {
                        final String msg = "A property name must be provided";
                        throw new ConfigurationException(msg);

                    final String upperKey = key.toUpperCase(Locale.ENGLISH);

                    if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
                        final String msg =
                            "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
                        throw new ConfigurationException(msg);

                    final String value = property.getValue();
                    if (Strings.isEmpty(value)) {
                        final String msg = "A value for property " + key + " must be provided";
                        throw new ConfigurationException(msg);

                    if (upperKey.equals("SINKS")) {
                        sinks = value.trim().split(" ");

                    props.put(key, value);

                if (sinks == null || sinks.length == 0) {
                    final String msg = "At least one Sink must be specified";
                    throw new ConfigurationException(msg);
            return props;





Or download all of them as a single archive file:

File name: log4j-flume-ng-2.14.1-sources.jar
File size: 37127 bytes
Release date: 2021-03-06


Source Code for Apache Log4j 1.2 Bridge

Source Code for Apache Log4j Tag Library

Downloading and Reviewing Apache Log4j Packages

⇑⇑ FAQ for Apache Log4j

2015-11-04, 6859👍, 0💬