百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

失眠架构师,使用ApacheBeam实现实时监控统计,不来后悔一辈子

toyiye 2024-06-21 12:18 8 浏览 0 评论

前言

网站使用Apache服务器,对于请求网站资源的事件被记录到日志中,我们需要基于该日志文件的数据进行实时监控统计,通过读取IP库数据可以得到每个访问IP的所属地域(国内按城市,国外按国家,IP库中没有的按未知处理)。其实,整个流程我们可以通过Flume收集聚合多个子站日志文件数据,并写入到下游的Kafka消息中间件集群中,然后可以直接从Kafka中进行消费,实现实时监控统计,最后结果更新到Redis中去。为了简单,我们这里只是通过输入的日志文件作为数据源,下游直接通过Apache Beam来进行实时分析处理,结果输出到多个按时间分组的文件中。我们实现的实时监控功能目标,如下所示:

  • 输入事件日志文件,以及IP库文件;
  • 基于日志文件中的事件时间,每间隔5分钟输出一个统计文件,结果文件中包含“地域”和“访问次数”。

下面是文件格式示例。事件日志文件的格式,示例如下所示:

113.246.155.26 - - [10/Dec/2017:01:03:28 +0800] "GET /wp-content/themes/media-maven/library/images/bg.jpg HTTP/1.1" 200 8113 "http://shiyanjun.cn/archives/1097.html" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:57.0) Gecko/20100101 Firefox/57.0" qxu1780320105.my3w.com image/jpeg "/usr/home/qxu1780320105/htdocs/wp-content/themes/media-maven/library/images/bg.jpg" 1365
119.4.252.140 - - [10/Dec/2017:01:03:29 +0800] "GET /wp-content/themes/media-maven/library/css/default.css HTTP/1.1" 200 4543 "http://shiyanjun.cn/archives/325.html" "Mozilla/5.0 (Linux; Android 5.0.2; HTC X9u Build/LRX22G) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.98 Mobile Safari/537.36" qxu1780320105.my3w.com text/css "/usr/home/qxu1780320105/htdocs/wp-content/themes/media-maven/library/css/default.css" 1759
42.197.51.105 - - [10/Dec/2017:01:08:06 +0800] "GET /wp-content/themes/media-maven/library/css/default.css HTTP/1.1" 200 4543 "http://shiyanjun.cn/archives/1075.html" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36" qxu1780320105.my3w.com text/css "/usr/home/qxu1780320105/htdocs/wp-content/themes/media-maven/library/css/default.css" 2452

IP库文件的格式,示例如下所示:

  1. 1.119.132.165 北京
  2. 1.14.146.248 上海
  3. 1.192.241.74 郑州
  4. 14.116.140.15 广州
  5. 14.116.140.17 广州
  6. 94.180.155.98 俄罗斯
  7. 95.46.246.36 捷克
  8. 97.107.132.213 美国

设计思路

这里,我们基于Apache Beam 2.1.0来实现。现在,我们来分析一下设计的要点:

  • 记录按时间分组

每隔5分钟输出一个统计文件,这里需要用到Apache Beam中的固定时间窗口(Fixed Time Window)。

  • IP库数据文件加载共享

输入指定了一个静态的IP库文件,所以需要给ParDo设置一个Side Input,将IP库数据加载到Pipeline中,为处理每个读取到的日志文件中的事件记录数据,提供IP到地域的关系映射,从而汇总得到每个地域对应的访问次数。

  • 输出统计结果分组

每隔5分钟输出一个统计文件,那么需要在输出的时候控制文件的生成,这里需要实现一个PTransform,通过读取到每个Window的开始时间(InternalWindow.start())和结束时间(InternalWindow.end()),并通过拼接开始时间和结束时间字符串来命名输出文件。

编程实现

创建PiplineOptions配置

首先需要创建一个PipelineOptions,设置我们需要的输入文件、Window相关参数等配置,实现代码如下所示:

interface WindowingOptions extends PipelineOptions {
 
    @Description("Path of the IP library file to read from.")
    String getIpFile();
    void setIpFile(String ipFile);
 
    @Description("Path of the event log file to read from.")
    String getEventFile();
    void setEventFile(String eventFile);
 
    @Description("Fixed window duration, in seconds.")
    @Default.Integer(5)
    Integer getWindowSizeSecs();
    void setWindowSizeSecs(Integer value);
 
    @Description("Fixed number of shards to produce per window.")
    @Default.Integer(1)
    Integer getNumShards();
    void setNumShards(Integer numShards);
 
    @Description("Directory of the output to write to.")
    String getOutputDir();
    void setOutputDir(String outputDir);
 
    @Description("Prefix of the output file prefix.")
    @Default.String("result")
    String getOutputFilePrefix();
    void setOutputFilePrefix(String outputFilePrefix);
}

上面代码中,主要指定了如下配置选项:

  • IP库文件路径
  • 事件日志文件路径
  • 固定时间窗口(Fixed Time Window)的时间长度(Duration),默认是5秒
  • 每个Window输出的分区文件个数,默认是1个
  • 输出目录字符串
  • 输出文件名称前缀字符串,默认前缀是result

在Apache Beam中,通过解析输入参数,获取到配置参数值,代码如下所示:

WindowingOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(WindowingOptions.class);
 
String ipFile = options.getIpFile();
String eventFile = options.getEventFile();
String output = new File(options.getOutputDir(),
        options.getOutputFilePrefix()).getAbsolutePath();

PipelineOptionsFactory通过反射机制,创建一个WindowingOptions对象,然后就可以获取到输入的参数值,方便在Pipeline配置过程中使用。

实现IP库的Side Input处理

将IP库文件读取出来,通过Map结构来保存对应的IP到地域的关系映射,这样能够在处理每个日志事件的过程中,直接将从事件记录中解析出来IP转换为地域名称,为后续统计做准备。在Apache Beam中,首先需要创建一个PCollectionView,作为ParDo的Side Input传入到Pipeline中,代码如下所示:

final PCollectionView<Map<String, String>> ipToAreaMapView =
        pipeline.apply(TextIO.read().from(ipFile))
                .apply(ParDo.of(new DoFn<String, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String[] ipAreaPair = c.element().split("\t");
                if(ipAreaPair.length == 2) {
                    c.output(KV.of(ipAreaPair[0], ipAreaPair[1]));
                }
            }
        })).apply(View.<String, String>asMap());

然后,就可以在处理日志事件的时候读取到ipToAreaMapView中对应的Map结构的数据,代码如下所示:

// read input event source
PCollection<String> events = pipeline.apply(TextIO.read().from(eventFile))
        .apply(ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String event = c.element();
                    try {
                        String[] a = event.split("\\s+");
                        String ip = a[0];
                        String time = a[3].substring(1);
 
                        long ts = parseToTimestamp(time);
                        Instant instant = new Instant(ts);
                        String area = c.sideInput(ipToAreaMapView).get(ip);
                        area = (area == null ? "未知地域" : area);
                        c.outputWithTimestamp(area, instant);
                    } catch (ParseException e) {
                        // ignore it
                    }
                }
            }).withSideInputs(ipToAreaMapView)
        );

上面代码,我们解析出日志行中的时间戳,在调用ProcessContext.outputWithTimestamp()方法时指定为事件时间,供后续配置Window函数使用。上述代码通过ProcessContext输出每一个处理过的记录,包含地域、时间戳信息,后面可以根据这两个字段信息对进行分组统计。

配置Window函数

现在,我们需要为上面准备好的事件记录PCollection对象events,配置Window函数,控制输入数据元素的分组逻辑,使用Apache Beam内建实现的固定时间窗口函数即可,代码如下所示:

// configure windowing settings
PCollection<String> windowedEvents =
        events.apply(
                Window.<String>into(FixedWindows.of(
                        Duration.standardSeconds(options.getWindowSizeSecs()))));

在名称为events的PCollection对象上进行配置,通过Window.into()方法,设置一个WindowFn函数,返回一个作用于该PCollection对象的PTransform,对该数据集中数据元素进行分组处理。只有配置的Window函数,在上面代码中得到的windowedEvents之上添加PTransform,才会对每个Window中的数据进行操作。配置好了Window函数的PCollection对象windowedEvents,对其执行按地域分组汇总操作,代码如下所示:

// count by (window, area)
PCollection<KV<String, Long>> areaCounts =
        windowedEvents.apply(Count.<String>perElement());

这里,会根据我们设置的固定时间窗口的时间长度,分割为多个Window,这些信息是不暴露给Bean应用开发人员的。我们只需要知道前面输出的是每条访问网站事件记录信息的地域名称即可,这里在实际执行中,会以(Window, 地域)作为Key进行分组汇总,Window可能是[12:00, 12:05)这种形式。汇总得到的结果,是每个地域对应一个访问次数,结果类似于一个三元组:([12:00, 12:05), 地域, 次数),也就是汇总每个Window中具有相同的地域记录的数量。

输出统计结果

输出最终的统计结果,实现代码如下所示:

// control to output final result
final PTransform<PCollection<String>, PDone> writer =
        new PerWindowOneFileWriter(output, options.getNumShards());
 
// format & output windowed result
areaCounts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
            @Override
            public String apply(KV<String, Long> input) {
                return input.getKey() + "\t" + input.getValue();
            }
        })).apply(writer);

这里,先是创建了一个PerWindowOneFileWriter,它是一个PTransform,在最后输出时使用它,同时对输出数据进行格式化,使用TAB见分隔“地域”和“访问次数”,形成一行记录字符串,输出到文件中,其实是一个Window中数据对应一个输出文件(如果我们设置了WindowingOptions配置对象中的numShards值为1)。下面,看下实现输出文件的PTransform PerWindowOneFileWriter的实现,代码如下所示:

package org.shirdrn.beam.monitoring;
 
import com.google.common.base.Verify;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
 
public class PerWindowOneFileWriter extends PTransform<PCollection<String>, PDone> {
 
  private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
  private String filenamePrefix;
  private Integer numShards;
 
  public PerWindowOneFileWriter(String filenamePrefix, Integer numShards) {
    this.filenamePrefix = filenamePrefix;
    this.numShards = numShards;
  }
 
  @Override
  public PDone expand(PCollection<String> input) {
    String prefix = "";
    ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
    if (!resource.isDirectory()) {
      prefix = Verify.verifyNotNull(resource.getFilename(),
          "A non-directory resource should have a non-null filename: %s",
          resource);
    }
 
    TextIO.Write write = TextIO.write()
        .to(resource.getCurrentDirectory())
        .withFilenamePolicy(new PerWindowFiles(prefix))
        .withWindowedWrites();
    write = write.withNumShards(numShards == null ? 1 : numShards);
    return input.apply(write);
  }
 
  public static class PerWindowFiles extends FilenamePolicy {
 
    private final String prefix;
 
    public PerWindowFiles(String prefix) {
      this.prefix = prefix;
    }
 
    private String generateFilenamePrefix(IntervalWindow window) {
      return String.format("%s_%s-%s", prefix,
              FORMATTER.print(window.start()),
              FORMATTER.print(window.end()));
    }
 
    @Override
    public ResourceId windowedFilename(
            ResourceId outputDirectory, WindowedContext context, String extension) {
      IntervalWindow window = (IntervalWindow) context.getWindow();
      int numShards = context.getNumShards();
      String filename;
      String prefix = generateFilenamePrefix(window);
      if(numShards == 1) {
        filename = String.format("%s", prefix);
      } else {
        filename = String.format("%s_%s_%s%s",
                prefix, context.getShardNumber(), context.getNumShards(), extension);
      }
      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }
 
    @Override
    public ResourceId unwindowedFilename(
            ResourceId outputDirectory, Context context, String extension) {
      throw new UnsupportedOperationException("Unsupported.");
    }
  }
}

上面代码实现,是参考Apache Beam发行包中自带的例子修改而来,最核心的就是控制文件生成的FilenamePolicy的实现,它通过实现windowedFilename()方法,基于WindowedContext获取到Window对应的信息(Window开始时间和结束时间),从而生成带有窗口时间范围的字符串,作为最终输出的文件名称的一部分。

运行程序

运行上述程序,需要指定输入的文件路径以及Apache Beam相关的参数,示例如下所示:如果我们通过命令行的方式,启动程序运行,可以参考如下的配置:

args = new String[] {
        "—-ipFile=/Users/yanjun/Data/beam/events/ips.txt",
        "—-eventFile=/Users/yanjun/Data/beam/events/apache_event_20171210.log",
        "—-outputDir=/Users/yanjun/Data/beam/events/output/",
        "—-outputFilePrefix=result",
        "—-windowSizeSecs=300",
        "—-numShards=1"
};

这样,就可以获取到对应的参数值,在Pipeline中可以读取并使用。运行配置好的Pipeline,代码如下所示:

// execute beam pipeline
PipelineResult result = pipeline.run();
try {
    result.waitUntilFinish();
} catch (Exception exception) {
    result.cancel();
}

运行程序后,可以看到生成的结果文件,文件名称类似如下的文件列表:

result_00:00-00:05
result_00:05-00:10
result_00:10-00:15
result_00:15-00:20
result_00:20-00:25
result_00:25-00:30
result_00:30-00:35
result_00:35-00:40
result_00:40-00:45
result_00:45-00:50
result_00:50-00:55
result_00:55-01:00
result_01:00-01:05
result_01:05-01:10
result_01:10-01:15

查看其中某个结果文件内容,类似如下所示:

厦门    11
福州    101
武汉    12
成都    210
南京    13
杭州    219
广州    35
美国    16
上海    107
北京    617
未知地域    108

可以看到,文件输出的结果,正是我们希望的。如果不使用文件输出,那就需要修改最后控制输出的PTransform(PerWindowOneFileWriter)的处理逻辑,将结果数据写入到任何需要的存储系统中,保存以供其他系统使用(如实时动态地显示监控图表)。

(原创:时延军(包含链接:http://shiyanjun.cn)

多多转发关注小编不迷路!!感谢大家支持,每天更新技术好文~~

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码