1. Introduction to Flume

Apache Flume is a distributed, highly available data collection system. It collects data from different data sources, aggregates them and sends them to the storage system, usually for collecting log data. Flume is divided into two versions, NG and OG (before 1.0). NG is completely rebuilt on the basis of OG and is the most widely used version. The following descriptions are based on NG.

2, Flume architecture and basic concepts

The following picture shows the basic architecture of Flume:


2.1 Basic Architecture

The external data source in a particular format to transmit Flume events(events), when source received events, it will be stored in one or more thereof channel,  channel remains stored events until it is sink consumed. sink The main function of the channel reading events, and stores it in an external storage system or forwarded to the next source, and then from the successful channel removal of the events.

2.2 Basic Concepts

1. Event

Evnet It is the basic unit of Flume NG data transmission. Similar to messages in JMS and messaging systems. One  evnet of headers and the body: the former is a key / value mapping, which is an arbitrary byte array.

2. Source

A data collection component that collects data from external data sources and stores them in a Channel.

3. Channel

Channel Is the pipeline between the source and the receiver for temporarily storing data. Can be a memory or a persistent file system:

  • Memory Channel : The advantage of using memory is that the speed is fast, but the data may be lost (such as sudden downtime);
  • File Channel : Using a persistent file system has the advantage of ensuring that data is not lost, but at a slow rate.

4. Sink

SinkThe main function of the Channelreading Evnet, and stores it in an external storage system or forwarded to the next Source, and then from the successful Channelremoval of the Event.

5. Agent

Is a stand-alone (JVM) process that contains components such as Source, Channel, Sink and so on.

2.3 Component Type

Every component in Flume is available in a variety of styles for different scenarios:

  • Source type: There are dozens of types built in, such as Avro Source, Thrift Source, Kafka Source, JMS Source;
  • Sink type: HDFS Sink, Hive Sink, HBaseSinks, Avro Sinketc;
  • Channel type: Memory Channel, JDBC Channel, Kafka Channel, File Channe land so on.

For the use of Flume, Sink and Channel can meet most of the requirements by combining various types of Sources built in, unless there is a special need. In Flume official website, on doing all manner of configuration parameters for the type of components are in the form of a detailed description, along with sample configuration; at the same time different versions of the parameters may be slightly different, it is recommended to select the corresponding version of the official website when using the User Guide is the primary reference.

3, Flume architecture mode

Flume supports a variety of architectural modes, as described below

3.1 multi-agent flow


Flume support across multiple data transfer Agent, the former of which requires a Agent Source and Sink next Agent must be of Avrotype, Sink point to Source the host name (or IP address) and port (see detailed configuration Case III below).

3.2 Consolidation


There are often a large number of clients (such as distributed web services) in log collection. Flume supports multiple agents to collect logs separately, and then aggregate them through one or more agents and then store them in the file system.

3.3 Multiplexing the flow


Flume supports passing events from one Source to multiple Channels, that is, to multiple Sinks. This operation is called Fan Out(fanout). By default Fan Outis to copy all of the Channel Event, all the data that is received is the same Channel. Flume while also supporting the Sourcerouting rules implemented to customize a custom multiplexed by a selector (multiplexing selector) of.

4, Flume configuration format

Flume configuration usually requires the following two steps:

  1. Define the Sources, Sinks, and Channels of the Agent, and then bind Sources and Sinks to the channel. Note that a Source can configure multiple Channels, but a Sink can only be configured with one Channel. The basic format is as follows:
Copy<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
  1. Define the specific properties of Source, Sink, and Channel respectively. The basic format is as follows:
<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

5, Flume installation and deployment

In order to facilitate the later review, all the software installations in this warehouse are separately created. For the installation of Flume, see:

Installation and deployment of Flume in Linux environment

6, Flume use case

Introduce several use cases of Flume:

  • Case 1: Use Flume to listen to file content changes and output the newly added content to the console.
  • Case 2: Use Flume to listen to the specified directory and store the newly added files in the directory to HDFS.
  • Case 3: Use Avro to send the log data collected by this server to another server.

6.1 Case One

Requirement: Listen for file content changes and output the newly added content to the console.

Implementation: The main use Exec Source with the tail command to achieve.

1. Configuration

Create a new configuration file exec-memory-logger.properties with the following contents:

Copy#assign agent sources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
#config sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c

#binding sources and channels
a1.sources.s1.channels = c1
#config sink 
a1.sinks.k1.type = logger

#binding sinks and channels  
a1.sinks.k1.channel = c1  
#config channel type
a1.channels.c1.type = memory

2. Start

Copyflume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \

3. Test

Append data to the file:


The display of the console:


6.2 Case 2

Requirement: Listen to the specified directory and store the newly added files in the directory to HDFS.

Implementation: use Spooling Directory Sourceand HDFS Sink.

1. Configuration

a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  

a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName 

a1.sources.s1.channels =c1 


a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#file type,default Sequencefile,or DataStream,textfile
a1.sinks.k1.hdfs.fileType = DataStream  
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.channel = c1
a1.channels.c1.type = memory

2. Start

Copyflume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console

3. Test

Copy any file to the listening directory, you can see the path of the file uploaded to HDFS from the log:

Copy# cp log.txt logs/


Check whether the content of the file uploaded to HDFS is consistent with the local:

Copy# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801


6.3 Case 3

Requirement: Send the data collected by this server to another server.

Implementation: use avro sources and avro Sink implementation.

1. Configure log collection Flume

New configuration netcat-memory-avro.properties, monitoring the file contents change, then the new file content through avro sink port 8888 sent to hadoop001 this server:

Copy#assign agent sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#config sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1

#config sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#config channel type
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2. Configure log aggregation Flume

Using the avro source 8888 port to listen hadoop001 server will get the contents of the output to the console:

Copy#config agent sources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2

#config sources
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888

#binding sources and channels
a2.sources.s2.channels = c2

#config sink
a2.sinks.k2.type = logger

#binding sinks and channels
a2.sinks.k2.channel = c2

#config channel type
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

3. Start

Start the log aggregation Flume:

Copyflume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console

Collect Flume in the startup log:

Copyflume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console

It is recommended that the above order to start because avro.sourcewill first bind the port,  so that avro sink when a connection is not reported abnormal can not be connected. But even if you don’t start in order, it doesn’t matter, sink you will try again until you establish a connection.



To file tmp/log.txt additional content:


You can see that the content has been listened to from the 8888 port and successfully output to the console:


More big data series articles can be found in the GitHub open source project :
Big Data Getting Started Guide

Orignal link:https://www.cnblogs.com/heibaiying/p/11439250.html