java 分布式日志收集收集系统:Flume

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供 对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 Flume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重。为了解决这些问 题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置 以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。IBM 的这篇文章: Flume NG:Flume 发展史上的第一次革命,从基本组件以及用户体验的角度阐述 Flume OG 到 Flume NG 发生的革命性变化。

一、Flume OG

Flume OG的设计目标:

  1. 可靠性:当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end- to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。
  2. 可扩展性:Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent 和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就 避免了单点故障问题。
  3. 可管理性:所有agent和Collector由master统一管理,这使得系统便于维护。多master情况,Flume利用 ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动 态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
  4. 功能可扩展性:用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file,syslog等),collector和storage(file,HDFS等)。

Flume OG的架构:

flume-og

在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。

flume-1

  • 对于agent数据流配置就是从哪得到数据,把数据发送到哪个collector。
  • 对于collector是接收agent发过来的数据,把数据发送到指定的目标机器上。

Flume框架对hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将hadoop和zookeeper服务也启动。

如前面提到的,Flume采用了分层架构:分别为Agent,Collector和Storage。Agent用于采集数据,Agent是 Flume中产生数据流的地方。同时,Agent会将产生的数据流传输到Collector。Collector用于对数据进行聚合,往往会产生一个更大 的流,然后传输到Storage。其中,Agent和Collector均由两部分组成:source和sink,source是数据来源,sink是数 据去向。Flume使用两个组件:Master和Node,Node根据在Master shell或web中动态配置,决定其是作为Agent还是Collector。

1、Agent

Agent的作用是将数据源的数据发送给collector。Flume自带了很多直接可用的数据源(source),如:

  • text(“filename”):将文件filename作为数据源,按行发送
  • tail(“filename”):探测filename新产生的数据,按行发送出去
  • fsyslogTcp(5140):监听TCP的5140端口,并且接收到的数据发送出去
  • tailDir(“dirname”[, fileregex=”.*”[, startFromEnd=false[, recurseDepth=0]]]):监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子 目录的深度

更多可参见这位朋友的整理: http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050465.html

同时提供了很多sink,如:

  • console[(“format”)] :直接将将数据显示在consolr上
  • text(“txtfile”):将数据写到文件txtfile中
  • dfs(“dfsfile”):将数据写到HDFS上的dfsfile文件中
  • syslogTcp(“host”,port):将数据通过TCP传递给host节点
  • agentSink[(“machine”[,port])]:等价于agentE2ESink,如果省略,machine参数,默认使用 flume.collector.event.host与flume.collector.event.port作为默认collecotr
  • agentDFOSink[(“machine” [,port])]:本地热备agent,agent发现collector节点故障后,不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中
  • agentBESink[(“machine”[,port])]:不负责的agent,如果collector故障,将不做任何处理,它发送的数据也将被直接丢弃
  • agentE2EChain:指定多个collector提高可用性。 当向主collector发送event失效后,转向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍

更多可参见这位朋友的整理: http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050472.html

2、Collector

Collector的作用是将多个Agent的数据汇总后,加载到Storage中。它的source和sink与agent类似。

数据源(source),如:

  • collectorSource[(port)]:Collector source,监听端口汇聚数据
  •  autoCollectorSource:通过master协调物理节点自动汇聚数据
  • logicalSource:逻辑source,由master分配端口并监听rpcSink

sink,如:

  • collectorSink( “fsdir”,”fsfileprefix”,rollmillis):collectorSink,数据通过collector汇聚之后发送到hdfs, fsdir 是hdfs目录,fsfileprefix为文件前缀码
  • customdfs(“hdfspath”[, “format”]):自定义格式dfs

3 、Storage

storage是存储系统,可以是一个普通file,也可以是HDFS,HIVE,HBase,分布式存储等。

4 、Master

Master是管理协调Agent和Collector的配置等信息,是flume集群的控制器。

二、Flume NG

对于Flume OG ,可以说他是一个分布式日志收集系统,有Mater概念,依赖于Zookeeper,Agent用于采集数据,Agent是Flume中产生数据流的地 方,同时,Agent会将产生的数据流传输到Collector。对应的,collector用于对数据进行聚合,往往会产生一个更大的流。而对于 Flume NG,它摒弃了Master和zookeeper,collector也没有了,web配置台也没有了,只剩下source,sink和channel, 此时一个Agent的概念包括source、channel和sink,完全由一个分布式系统变成了传输工具。不同机器之间的数据传输不再是OG那样由 agent->collector,而是由一个Agent端的sink流向另一个agent的source。

Flume NG中的核心概念:

  • Client:生产数据,运行在一个独立的线程。
  • Source:从Client收集数据,传递给Channel。可以接收外部源发送过来的数据。不同的 source,可以接受不同的数据格 式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。
  • Channel:是一个存储地,接收source的输出,直到有sink消费掉channel中的数据。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。
  • Sink:会消费channel中的数据,然后送给外部源或者其他source。如数据可以写入到HDFS或者HBase中。
  • Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
  • Events:Flume NG传输的数据的基本单位是event,如果是文本文件,通常是一行记录,这也是事务的基本单位。

Flume NG相对于Flume OG的主要变化:

  • sources和sinks 使用channels 进行链接
  • 两个主要channel:in-memory channel,非持久性支持,速度快; JDBC-based channel 持久性支持。
  • 不再区分逻辑和物理node,所有物理节点统称为agents,每个agents 都能运行0个或多个sources 和sinks
  • 不再需要master节点和对zookeeper的依赖,配置文件简单化。
  • 插件化,一部分面对用户,工具或系统开发人员。
  • 使用Thrift、Avro Flume sources 可以从flume0.9.4 发送 events 到flume 1.x

Flume OG节点组成图:

flume-og-1

Flume NG节点组成图:

flume-ng-1

对应于 OG 的特点,FLUM NG 的特点是:

  • NG 只有一种角色的节点:代理节点(agent)。
  • 没有 collector、master 节点。这是核心组件最核心的变化。
  • 去除了 physical nodes、logical nodes 的概念和相关内容。
  • agent 节点的组成也发生了变化。

Flume NG 以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成。

Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。可以把Channel看 作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。值得注意的是,Flume提供了大量 内置的Source、Channel和Sink类型。不同类型的Source、Channel和Sink可以自由组合。组合方式基于用户设置的配置文件, 非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan- out、Contextual Routing、Backup Routes。如下图:

flume-ng-2

Flume 允许多个 agent 连在一起,形成前后相连的多级跳:

flume-ng-3

1、 source

Flume 支持 Avro,log4j,syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以 写一个 Source,以 IPC 或 RPC 的方式接入自己的应用,Avro和 Thrift 都可以(分别 有 NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient接口),其中 Avro 是默认 的 RPC 协议。具体代码级别的 Client 端数据接入,可以参考官方手册。对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文 件,基本可以实现无缝接入,不需要对现有程序进行任何改动。 对于直接读取文件 Source,有两种方式:

  • ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数据,如 tail -F 文件名 指令,在这种方式下,取的文件名必须是指定的。 ExecSource 可以实现对日志的实时收集,但是存在Flume不运行或者指令执行 出错时,将无法收集到日志数据,无法保证日志数据的完整性。
  •  SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到 spool 目录下的文件不可 以再打开编辑;spool 目录下不可包含相应的子目录。SpoolSource 虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近 于实时。如果应用无法实现以分钟切割日志文件的话, 可以两种收集方式结合使用。在实际使用的过程中,可以结合 log4j 使用,使用 log4j的时候,将 log4j 的文件分割机制设为1分钟一次, 将文件拷贝到spool的监控目录。log4j 有一个 TimeRolling 的插件,可以把 log4j 分割文件到 spool 目录。基本实现 了实时的监控。Flume 在传完文件之后,将会修改文件的后缀,变为 .COMPLETED(后缀也可以在配置文件中灵活指定)

2、Channel

当前有几个 channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种 channel。

  • MemoryChannel 可以实现高速的吞吐,但是无法保证数据的完整性。
  • MemoryRecoverChannel 在官方文档的建议上已经建义使用FileChannel来替换。
  • FileChannel保证数据的完整性与一致性。在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

File Channel 是一个持久化的隧道(channel),它持久化所有的事件,并将其存储到磁盘中。因此,即使 Java 虚拟机当掉,或者操作系统崩溃 或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会造成数据丢失。Memory Channel 是一个不稳定的隧道,其原因是由于它在内存中存储所有事件。如果 java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间 收到 RAM大小的限制,而 File Channel 这方面是它的优势,只要磁盘空间足够,它就可以将所有事件数据存储到磁盘上。

3、sink

Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔 保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。更多sink的内容可以参考 官方手册

从整体上讲,NG 在核心组件上进行了大规模的调整,核心组件的数目由 7 删减到 4。由于 Flume 的使用涉及到众多因素, 如 avro、thrift、hdfs、jdbc、zookeeper 等,而这些组件和 Flume 的整合都需要关联到所有组件。所以核心组件的改革 对整个 Flume 的使用影响深远:

  • 大大降低了对用户的要求,如不再依赖 zookeeper,用户无需去搭建 zookeeper 集群
  • 用户也不再纠结于 OG 中的模糊概念(尤其是 physical nodes、logical nodes,agent、collector)
  • 有利于 Flume 和其他技术、hadoop 周边组件的整合,比如在 NG 版本中,Flume 轻松实现了和 jdbc、hbase 的集成
  • 将 OG 版本中复杂、大规模、不稳定的标签移除,Flume 实现了向灵活、轻便的转变,而且在功能上更加强大、可扩展性更高

参考链接:

Related posts:

  1. dfopen():discuz封装的fsockopen()
  2. 防采集系统的设计
  3. 磁盘类软件

Tagged: ,

Comments are closed.