百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Spring Batch(1)——数据批处理概念

toyiye 2024-09-12 20:49 4 浏览 0 评论

批处理的核心场景

  • 从某个位置读取大量的记录,位置可以是数据库、文件或者外部推送队列(MQ)。
  • 根据业务需要实时处理读取的数据。
  • 将处理后的数据写入某个位置,可以是数据库、文件或者推送到队列。

Spring Batch能解决的批处理场景

Spring Batch为批处理提供了一个轻量化的解决方案,它根据批处理的需要迭代处理各种记录,提供事物功能。但是Spring Batch仅仅适用于"脱机"场景,在处理的过程中不能和外部进行任何交互,也不允许有任何输入。

Spring Batch的目标

  • 开发人员仅关注业务逻辑,底层框架的交互交由Spring Batch去处理。
  • 能够清晰分离业务与框架,框架已经限定了批处理的业务切入点,业务开发只需关注这些切入点(Read、Process、Write)。
  • 提供开箱即用的通用接口。
  • 快速轻松的融入Spring 框架,基于Spring Framework能够快速扩展各种功能。
  • 所有现有核心服务都应易于更换或扩展,而不会对基础架构层产生任何影响。

Spring Batch结构

如上图,通常情况下一个独立的JVM程序就是仅仅用于处理批处理,而不要和其他功能重叠。 在最后一层基础设置(Infrastructure)部分主要分为3个部分。JobLauncher、Job以及Step。每一个Step又细分为ItemReader、ItemProcessor、ItemWirte。使用Spring Batch主要就是知道每一个基础设置负责的内容,然后在对应的设施中实现对应的业务。

Spring Batch 批处理原则与建议

当我们构建一个批处理的过程时,必须注意以下原则:

  1. 通常情况下,批处理的过程对系统和架构的设计要够要求比较高,因此尽可能的使用通用架构来处理批量数据处理,降低问题发生的可能性。Spring Batch是一个是一个轻量级的框架,适用于处理一些灵活并没有到海量的数据。
  2. 批处理应该尽可能的简单,尽量避免在单个批处理中去执行过于复杂的任务。我们可以将任务分成多个批处理或者多个步骤去实现。
  3. 保证数据处理和物理数据紧密相连。笼统的说就是我们在处理数据的过程中有很多步骤,在某些步骤执行完时应该就写入数据,而不是等所有都处理完。
  4. 尽可能减少系统资源的使用、尤其是耗费大量资源的IO以及跨服务器引用,尽量分配好数据处理的批次。
  5. 定期分析系统的IO使用情况、SQL语句的执行情况等,尽可能的减少不必要的IO操作。优化的原则有:
  6. 尽量在一次事物中对同一数据进行读取或写缓存。
  7. 一次事物中,尽可能在开始就读取所有需要使用的数据。
  8. 优化索引,观察SQL的执行情况,尽量使用主键索引,尽量避免全表扫描或过多的索引扫描。
  9. SQL中的where尽可能通过主键查询。
  10. 不要在批处理中对相同的数据执行2次相同的操作。
  11. 对于批处理程序而言应该在批处理启动之前就分配足够的内存,以免处理的过程中去重新申请新的内存页。
  12. 对数据的完整性应该从最差的角度来考虑,每一步的处理都应该建立完备的数据校验。
  13. 对于数据的总量我们应该有一个和数据记录在数据结构的某个字段 上。
  14. 所有的批处理系统都需要进行压力测试。
  15. 如果整个批处理的过程是基于文件系统,在处理的过程中请切记完成文件的备份以及文件内容的校验。

批处理的通用策略

和软件开发的设计模式一样,批处理也有各种各样的现成模式可供参考。当一个开发(设计)人员开始执行批处理任务时,应该将业务逻辑拆分为一下的步骤或者板块分批执行:

  1. 数据转换:某个(某些)批处理的外部数据可能来自不同的外部系统或者外部提供者,这些数据的结构千差万别。在统一进行批量数据处理之前需要对这些数据进行转换,合并为一个统一的结构。因此在数据开始真正的执行业务处理之前,先要使用其他的方法或者一些批处理任务将这些数据转换为统一的格式。
  2. 数据校验:批处理是对大量数据进行处理,并且数据的来源千差万别,所以批处理的输入数据需要对数据的完整性性进行校验(比如校验字段数据是否缺失)。另外批处理输出的数据也需要进行合适的校验(例如处理了100条数据,校验100条数据是否校验成功)
  3. 提取数据:批处理的工作是逐条从数据库或目标文件读取记录(records),提取时可以通过一些规则从数据源中进行数据筛选。
  4. 数据实时更新处理:根据业务要求,对实时数据进行处理。某些时候一行数据记录的处理需要绑定在一个事物之下。
  5. 输出记录到标准的文档格式:数据处理完成之后需要根据格式写入到对应的外部数据系统中。

以上五个步骤是一个标准的数据批处理过程,Spring batch框架为业务实现提供了以上几个功能入口。

数据额外处理

某些情况需要实现对数据进行额外处理,在进入批处理之前通过其他方式将数据进行处理。主要内容有:

  1. 排序:由于批处理是以独立的行数据(record)进行处理的,在处理的时候并不知道记录前后关系。因此如果需要对整体数据进行排序,最好事先使用其他方式完成。
  2. 分割:数据拆分也建议使用独立的任务来完成。理由类似排序,因为批处理的过程都是以行记录为基本处理单位的,无法再对分割之后的数据进行扩展处理。
  3. 合并:理由如上。

常规数据源

批处理的数据源通常包括:

  1. 数据库驱动链接(链接到数据库)对数据进行逐条提取。
  2. 文件驱动链接,对文件数据进行提取
  3. 消息驱动链接,从MQ、kafka等消息系统提取数据。

典型的处理过程

  1. 在业务停止的窗口期进行批数据处理,例如银行对账、清结算都是在12点日切到黎明之间。简称为离线处理。
  2. 在线或并发批处理,但是需要对实际业务或用户的响应进行考量。
  3. 并行处理多种不同的批处理作业。
  4. 分区处理:将相同的数据分为不同的区块,然后按照相同的步骤分为许多独立的批处理任务对不同的区块进行处理。
  5. 以上处理过程进行组合。

在执行2,3点批处理时需要注意事物隔离等级。

Spring Batch批处理的核心概念

下图是批处理的核心流程图。

Spring Batch同样按照批处理的标准实现了各个层级的组件。并且在框架级别保证数据的完整性和事物性。

如图所示,在一个标准的批处理任务中涵盖的核心概念有JobLauncher、Job、Step,一个Job可以涵盖多个Step,一个Job对应一个启动的JobLauncher。一个Step中分为ItemReader、ItemProcessor、ItemWriter,根据字面意思它们分别对应数据提取、数据处理和数据写入。此外JobLauncher、Job、Step会产生元数据(Metadata),它们会被存储到JobRepository中。

Job

简单的说Job是封装一个批处理过程的实体,与其他的Spring项目类似,Job可以通过XML或Java类配置,称为“Job Configuration”。如下图Job是单个批处理的最顶层。

为了便于理解,可以简单的将Job理解为是每一步(Step)实例的容器。他结合了多个Step,为它们提供统一的服务同时也为Step提供个性化的服务,比如步骤重启。通常情况下Job的配置包含以下内容:

  • Job的名称
  • 定义和排序Step执行实例。
  • 标记每个Step是否可以重启。

Spring Batch为Job接口提供了默认的实现——SimpleJob,其中实现了一些标准的批处理方法。下面的代码展示了如可注入一个Job。

@Bean
public Job footballJob() {
 return this.jobBuilderFactory.get("footballJob") //get中命名了Job的名称
 .start(playerLoad()) //playerLoad、gameLoad、playerSummarization都是Step
 .next(gameLoad())
 .next(playerSummarization())
 .end()
 .build();
}

JobInstance

JobInstance是指批处理作业运行的实例。例如一个批处理必须在每天执行一次,系统在2019年5月1日执行了一次我们称之为2019-05-01的实例,类似的还会有2019-05-02、2019-05-03实例。通常情况下,一个JobInstance对应一个JobParameters,对应多个JobExecution。(JobParameters、JobExecution见后文)。同一个JobInstance具有相同的上下文(ExecutionContext内容见后文)。

JobParameters

前面讨论了JobInstance与Job的区别,但是具体的区别内容都是通过JobParameters体现的。一个JobParameters对象中包含了一系列Job运行相关的参数,这些参数可以用于参考或者用于实际的业务使用。对应的关系如下图:

当我们执行2个不同的JobInstance时JobParameters中的属性都会有差异。可以简单的认为一个JobInstance的标识就是Job+JobParameters。

JobExecution

JobExecution可以理解为单次运行Job的容器。一次JobInstance执行的结果可能是成功、也可能是失败。但是对于Spring Batch框架而言,只有返回运行成功才会视为完成一次批处理。例如2019-05-01执行了一次JobInstance,但是执行的过程失败,因此第二次还会有一个“相同的”的JobInstance被执行。

Job用于定义批处理如何执行,JobInstance纯粹的就是一个处理对象,把所有的运行内容和信息组织在一起,主要是为了当面临问题时定义正确的重启参数。而JobExecution是运行时的“容器”,记录动态运行时的各种属性和上线文。他包括的信息有:

以上这些内容Spring Batch都会通过JobRepository进行持久化(这些信息官方文成称之为MetaData),因此在对应的数据源中可以看到下列信息:

BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

当某个Job批处理任务失败之后会在对应的数据库表中路对应的状态。假设1月1号执行的任务失败,技术团队花费了大量的时间解决这个问题到了第二天21才继续执行这个任务。

BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

从数据上看好似JobInstance是一个接一个顺序执行的,但是对于Spring Batch并没有进行任何控制。不同的JobInstance很有可能是同时在运行(相同的JobInstance同时运行会抛出JobExecutionAlreadyRunningException异常)。

Step

Step是批处理重复运行的最小单元,它按照顺序定义了一次执行的必要过程。因此每个Job可以视作由一个或多个多个Step组成。一个Step包含了所有所有进行批处理的必要信息,这些信息的内容是由开发人员决定的并没有统一的标准。一个Step可以很简单,也可以很复杂。他可以是复杂业务的组合,也有可能仅仅用于迁移数据。与JobExecution的概念类似,Step也有特定的StepExecution,关系结构如下:

StepExecution

StepExecution表示单次执行Step的容器,每次Step执行时都会有一个新的StepExecution被创建。与JobExecution不同的是,当某个Step执行失败后默认并不会重新执行。StepExecution包含以下属性:

ExecutionContext

前文已经多次提到ExecutionContext。可以简单的认为ExecutionContext提供了一个Key/Value机制,在StepExecution和JobExecution对象的任何位置都可以获取到ExecutionContext中的任何数据。最有价值的作用是记录数据的执行位置,以便发生重启时候从对应的位置继续执行:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())

比如在任务中有一个名为“loadData”的Step,他的作用是从文件中读取数据写入到数据库,当第一次执行失败后,数据库中有如下数据:

BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

BATCH_STEP_EXECUTION:

BATCH_STEP_EXECUTION_CONTEXT: |STEP_EXEC_ID|SHORT_CONTEXT| |---|---| |1|{piece.count=40321}|

在上面的例子中,Step运行30分钟处理了40321个“pieces”,我们姑且认为“pieces”表示行间的行数(实际就是每个Step完成循环处理的个数)。这个值会在每个commit之前被更新记录在ExecutionContext中(更新需要用到StepListener后文会详细说明)。当我们再次重启这个Job时并记录在BATCH_STEP_EXECUTION_CONTEXT中的数据会加载到ExecutionContext中,这样当我们继续执行批处理任务时可以从上一次中断的位置继续处理。例如下面的代码在ItemReader中检查上次执行的结果,并从中断的位置继续执行:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
 log.debug("Initializing for restart. Restart data is: " + executionContext);
 long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
 LineReader reader = getReader();
 Object record = "";
 while (reader.getPosition() < lineCount && record != null) {
 record = readLine();
 }
}

ExecutionContext是根据JobInstance进行管理的,因此只要是相同的实例都会具备相同的ExecutionContext(无论是否停止)。此外通过以下方法都可以获得一个ExecutionContext:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

但是这2个ExecutionContext并不相同,前者是在一个Step中每次Commit数据之间共享,后者是在Step与Step之间共享。

JobRepository

JobRepository是所有前面介绍的对象实例的持久化机制。他为JobLauncher、Job、Step的实现提供了CRUD操作。当一个Job第一次被启动时,一个JobExecution会从数据源中获取到,同时在执行的过程中StepExecution、JobExecution的实现都会记录到数据源中。使用@EnableBatchProcessing注解后JobRepository会进行自动化配置。

JobLauncher

JobLauncher为Job的启动运行提供了一个边界的入口,在启动Job的同时还可以定制JobParameters:

public interface JobLauncher {
	public JobExecution run(Job job, JobParameters jobParameters)
				throws JobExecutionAlreadyRunningException, JobRestartException,
					 JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

竟然都看到最后了,给小编点个关注吧,小编还会持续更新的,只收藏不点关注的都是在耍流氓!

相关推荐

如何用 coco 数据集训练 Detectron2 模型?

随着最新的Pythorc1.3版本的发布,下一代完全重写了它以前的目标检测框架,新的目标检测框架被称为Detectron2。本教程将通过使用自定义coco数据集训练实例分割模型,帮助你开始使...

CICD联动阿里云容器服务Kubernetes实践之Bamboo篇

本文档以构建一个Java软件项目并部署到阿里云容器服务的Kubernetes集群为例说明如何使用Bamboo在阿里云Kubernetes服务上运行RemoteAgents并在agents上...

Open3D-ML点云语义分割实验【RandLA-Net】

作为点云Open3D-ML实验的一部分,我撰写了文章解释如何使用Tensorflow和PyTorch支持安装此库。为了测试安装,我解释了如何运行一个简单的Python脚本来可视化名为...

清理系统不用第三方工具(系统自带清理软件效果好不?)

清理优化系统一定要借助于优化工具吗?其实,手动优化系统也没有那么神秘,掌握了方法和技巧,系统清理也是一件简单和随心的事。一方面要为每一个可能产生累赘的文件找到清理的方法,另一方面要寻找能够提高工作效率...

【信创】联想开先终端开机不显示grub界面的修改方法

原文链接:【信创】联想开先终端开机不显示grub界面的修改方法...

如意玲珑成熟度再提升,三大发行版支持教程来啦!

前期,我们已分别发布如意玲珑在deepinV23与UOSV20、openEuler24.03发行版的操作指南,本文,我们将为大家详细介绍Ubuntu24.04、Debian12、op...

118种常见的多媒体文件格式(英文简写)

MP4[?mpi?f??]-MPEG-4Part14(MPEG-4第14部分)AVI[e?vi??a?]-AudioVideoInterleave(音视频交错)MOV[m...

密码丢了急上火?码住7种console密码紧急恢复方式!

身为攻城狮的你,...

CSGO丨CS2的cfg指令代码分享(csgo自己的cfg在哪里?config文件位置在哪?)

?...

使用open SSL生成局域网IP地址证书

某些特殊情况下,用户内网访问多可文档管理系统时需要启用SSL传输加密功能,但只有IP,没有域名和证书。这种情况下多可提供了一种免费可行的方式,通过openSSL生成免费证书。此方法生成证书浏览器会提示...

Python中加载配置文件(python怎么加载程序包)

我们在做开发的时候经常要使用配置文件,那么配置文件的加载就需要我们提前考虑,再不使用任何框架的情况下,我们通常会有两种解决办法:完整加载将所有配置信息一次性写入单一配置文件.部分加载将常用配置信息写...

python开发项目,不得不了解的.cfg配置文件

安装软件时,经常会见到后缀为.cfg、.ini的文件,一般我们不用管,只要不删就行。因为这些是程序安装、运行时需要用到的配置文件。但对开发者来说,这种文件是怎么回事就必须搞清了。本文从.cfg文件的创...

瑞芯微RK3568鸿蒙开发板OpenHarmony系统修改cfg文件权限方法

本文适用OpenHarmony开源鸿蒙系统,本次使用的是开源鸿蒙主板,搭载瑞芯微RK3568芯片。深圳触觉智能专注研发生产OpenHarmony开源鸿蒙硬件,包括核心板、开发板、嵌入式主板,工控整机等...

Python9:图像风格迁移-使用阿里的接口

先不多说,直接上结果图。#!/usr/bin/envpython#coding=utf-8importosfromaliyunsdkcore.clientimportAcsClient...

Python带你打造个性化的图片文字识别

我们的目标:从CSV文件读取用户的文件信息,并将文件名称修改为姓名格式的中文名称,进行规范资料整理,从而实现快速对多个文件进行重命名。最终效果:将原来无规律的文件名重命名为以姓名为名称的文件。技术点:...

取消回复欢迎 发表评论:

请填写验证码