Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spring Batch(5)——文件读写

Spring Batch(5)——文件读写

作者头像
随风溜达的向日葵
发布于 2019-07-10 06:42:40
发布于 2019-07-10 06:42:40
2.1K00
代码可运行
举报
文章被收录于专栏:技术墨客技术墨客
运行总次数:0
代码可运行

Spring batch由上至下的结构JobStep都是属于框架级别的的功能,大部分时候都是提供一些配置选项给开发人员使用,而Item中的ReaderProcessorWriter是属于业务级别的,它开放了一些业务切入的接口。 但是文件的读写过程中有很多通用一致的功能Spring Batch为这些相同的功能提供了一致性实现类。

扁平结构文件

扁平结构文件(也称为矩阵结构文件,后文简称为文件)是最常见的一种文件类型。他通常以一行表示一条记录,字段数据之间用某种方式分割。与标准的格式数据(xml、json等)主要差别在于他没有结构性描述方案(SXD、JSON-SCHEME),进而没有结构性分割规范。因此在读写此类文件之前需要先设定好字段的分割方法。

文件的字段数据分割方式通常有两种:使用分隔符固定字段长度。前者通常使用逗号()之类的符号对字段数据进行划分,后者的每一列字段数据长度是固定的。 框架为文件的读取提供了FieldSet用于将文件结构中的信息映射到一个对象。FieldSet的作用是将文件的数据与类的field进行绑定(field是Java中常见的概念,不清楚的可以了解Java反射)。

数据读取

Spring Batch为文件读取提供了FlatFileItemReader类,它为文件中的数据的读取和转换提供了基本功能。在FlatFileItemReader中有2个主要的功能接口,一是Resource、二是LineMapperResource用于外部文件获取,详情请查看Spring核心——资源管理部分的内容,下面是一个例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Resource resource = new FileSystemResource("resources/trades.csv"); 

在复杂的生产环境中,文件通常由中心化、或者流程式的基础框架来管理(比如EAI)。因此文件往往需要使用FTP等方式从其他位置获取。如何迁移文件已经超出了Spring Batch框架的范围,在Spring的体系中可以参考Spring Integration项目。

下面是FlatFileItemReader的属性,每一个属性都提供了Setter方法。

属性名

参数类型

说明

comments

String[]

指定文件中的注释前缀,用于过滤注释内容行

encoding

String

指定文件的编码方式,默认为Charset.defaultCharset()

lineMapper

LineMapper

利用LineMapper接口将一行字符串转换为对象

linesToSkip

int

跳过文件开始位置的行数,用于跳过一些字段的描述行

recordSeparatorPolicy

RecordSeparatorPolicy

用于判断数据是否结束

resource

Resource

指定外部资源文件位置

skippedLinesCallback

LineCallbackHandler

当配置linesToSkip,每执行一次跳过都会被回调一次,会传入跳过的行数据内容

每个属性都为文件的解析提供了某方面的功能,下面是结构的说明。

LineMapper

这个接口的作用是将字符串转换为对象:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface LineMapper { T mapLine(String line, int lineNumber) throws Exception; }

接口的基本处理逻辑是聚合类(FlatFileItemReader)传递一行字符串以及行号给LineMapper::mapLine,方法处理后返回一个映射的对象。

LineTokenizer

这个接口的作用是将一行数据转换为一个FieldSet结构。对于Spring Batch而言,扁平结构文件的到Java实体的映射都通过FieldSet来控制,因此读写文件的过程需要完成字符串到FieldSet的转换:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface LineTokenizer { FieldSet tokenize(String line); }

这个接口的含义是:传递一行字符串数据,然后获取一个FieldSet

框架为LineTokenizer提供三个实现类:

  • DelimitedLineTokenizer:利用分隔符将数据转换为FieldSet。最常见的分隔符是逗号,,类提供了分隔符的配置和解析方法。
  • FixedLengthTokenizer:根据字段的长度来解析出FieldSet结构。必须为记录定义字段宽度。
  • PatternMatchingCompositeLineTokenizer:使用一个匹配机制来动态决定使用哪个LineTokenizer

FieldSetMapper

该接口是将FieldSet转换为对象:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface FieldSetMapper { T mapFieldSet(FieldSet fieldSet) throws BindException; }

FieldSetMapper通常和LineTokenizer联合在一起使用:String->FieldSet->Object

DefaultLineMapper

DefaultLineMapperLineMapper的实现,他实现了从文件到Java实体的映射:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DefaultLineMapper implements LineMapper<>, InitializingBean {
	private LineTokenizer tokenizer;
	private FieldSetMapper fieldSetMapper;
	public T mapLine(String line, int lineNumber) throws Exception {
		return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
	}
	public void setLineTokenizer(LineTokenizer tokenizer) {
		this.tokenizer = tokenizer;
	}
	public void setFieldSetMapper(FieldSetMapper fieldSetMapper) {
		this.fieldSetMapper = fieldSetMapper;
	}
}

在解析文件时数据是按行解析的:

  1. 传入一行字符串。
  2. LineTokenizer将字符串解析为FieldSet结构。
  3. FieldSetMapper继续解析为一个Java实体对象返回给调用者。

DefaultLineMapper是框架提供的默认实现类,看似非常简单,但是利用组合模式可以扩展出很多功能。

数据自动映射

在转换过程中如果将FieldSetnames属性与目标类的field绑定在一起,那么可以直接使用反射实现数据转换,为此框架提供了BeanWrapperFieldSetMapper来实现。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); //创建LineMapper

DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //创建LineTokenizer
tokenizer.setNames(new String[] { "siteId", "month", "type", "value", "ext" }); //设置Field名称

BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper 
	= new BeanWrapperFieldSetMapper<>(); //创建FieldSetMapper
wrapperMapper.setTargetType(WeatherEntity.class); //设置实体,实体的field名称必须和tokenizer.names一致。

// 组合lineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(wrapperMapper);

文件读取总结

上面提到了各种接口和实现,实际上都是围绕着FlatFileItemReader的属性在介绍,虽然内容很多但是实际上就以下几点:

  • 首先要定位文件,Spring Batch提供了Resource相关的定位方法。
  • 其次是将文件中的行字符串数据转换为对象,LineMapper的功能就是完成这个功能。
  • 框架为LineMapper提供了DefaultLineMapper作为默认实现方法,在DefaultLineMapper中需要组合使用LineTokenizerFieldSetMapper。前者将字符串转为为一个Field,后者将Field转换为目标对象。
  • LineTokenizer有3个实现类可供使用、FieldSetMapper有一个默认实现类BeanWrapperFieldSetMapper

文件读取可执行源码

可执行的源码在下列地址的items子工程中:

运行之前需要配置数据库链接,参看源码库中的README.md。

文件读取的主要逻辑在org.chenkui.spring.batch.sample.items.FlatFileReader类:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class FlatFileReader {
    // FeildSet的字段名,设置字段名之后可以直接使用名字作为索引获取数据。也可以使用索引位置来获取数据
    public final static String[] Tokenizer = new String[] { "siteId", "month", "type", "value", "ext" };
    private boolean userWrapper = false;

    @Bean
    //定义FieldSetMapper用于FieldSet->WeatherEntity
    public FieldSetMapper<WeatherEntity> fieldSetMapper() {
        return new FieldSetMapper<WeatherEntity>() {
            @Override
            public WeatherEntity mapFieldSet(FieldSet fieldSet) throws BindException {
                if (null == fieldSet) {
                    return null; // fieldSet不存在则跳过该行处理
                } else {
                    WeatherEntity observe = new WeatherEntity();
                    observe.setSiteId(fieldSet.readRawString("siteId"));
                    //Setter
                    return observe;
                }
            }
        };
    }

    @Bean
    // 配置 Reader
    public ItemReader<WeatherEntity> flatFileReader(
                           @Qualifier("fieldSetMapper") FieldSetMapper<WeatherEntity> fieldSetMapper) {
        FlatFileItemReader<WeatherEntity> reader = new FlatFileItemReader<>();
        reader.setResource(new FileSystemResource("src/main/resources/data.csv")); // 读取资源文件
        DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); // 初始化 LineMapper实现类
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 创建LineTokenizer接口实现

        tokenizer.setNames(Tokenizer); // 设定每个字段的名称,如果不设置需要使用索引获取值
        lineMapper.setLineTokenizer(tokenizer); // 设置tokenizer工具

        if (userWrapper) { //使用 BeanWrapperFieldSetMapper 使用反射直接转换
            BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>();
            wrapperMapper.setTargetType(WeatherEntity.class);
            fieldSetMapper = wrapperMapper;
        }

        lineMapper.setFieldSetMapper(fieldSetMapper);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1); // 跳过的初始行,用于过滤字段行
        reader.open(new ExecutionContext());
        return reader;
    }
}

按字段长度格读取文件

除了按照分隔符,有些文件可以字段数据的占位长度来提取数据。按照前面介绍的过程,实际上只要修改LineTokenizer接口即可,框架提供了FixedLengthTokenizer类:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
    FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

    tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
    //Range用于设定数据的长度。
    tokenizer.setColumns(new Range(1-12),
                        new Range(13-15),
                        new Range(16-20),
                        new Range(21-29));
	return tokenizer;
}

写入扁平结构文件

将数据写入到文件与读取的过程正好相反:将对象转换为字符串。

LineAggregator

LineMapper相对应的是LineAggregator,他的功能是将实体转换为字符串:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface LineAggregator<T> {
    public String aggregate(T item);
}

PassThroughLineAggregator

框架为LineAggregator接口提供了一个非常简单的实现类——PassThroughLineAggregator,其唯一实现就是使用对象的toString方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
    public String aggregate(T item) {
        return item.toString();
    }
}

DelimitedLineAggregator

LineAggregator的另外一个实现类是DelimitedLineAggregator。与PassThroughLineAggregator简单直接使用toString方法不同的是,DelimitedLineAggregator需要一个转换接口FieldExtractor

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);

FieldExtractor

FieldExtractor用于实体类到collection结构的转换。它可以和LineTokenizer进行类比,前者是将实体类转换为扁平结构的数据,后者是将String转换为一个FieldSet结构。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface FieldExtractor<T> {
    Object[] extract(T item);
}

框架为FieldExtractor接口提供了一个基于反射的实现类BeanWrapperFieldExtractor,其过程就是将实体对象转换为列表:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"field1", "field2"});

setName方法用于指定要转换的field列表。

输出文件处理

文件读取的逻辑非常简单:文件存在打开文件并写入数据,当文件不存在抛出异常。但是写入文件明显不能这么简单粗暴。新建一个JobInstance时最直观的操作是:存在同名文件就抛出异常,不存在则创建文件并写入数据。但是这样做显然有很大的问题,当批处理过程中出现问题需要restart,此时并不会从头开始处理所有的数据,而是要求文件存在并接着继续写入。为了确保这个过程FlatFileItemWriter默认会在新JobInstance运行时删除已有文件,而运行重启时继续在文件末尾写入。FlatFileItemWriter可以使用shouldDeleteIfExistsappendAllowedshouldDeleteIfEmpty来有针对性的控制文件。

文件写入可执源码

文件写入主要代码在org.chenkui.spring.batch.sample.items.FlatFileWriter

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class FlatFileWriter {

    private boolean useBuilder = true;

    @Bean
    public ItemWriter<MaxTemperatureEntiry> flatFileWriter() {
        BeanWrapperFieldExtractor<MaxTemperatureEntiry> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] { "siteId", "date", "temperature" }); //设置映射field
        fieldExtractor.afterPropertiesSet(); //参数检查

        DelimitedLineAggregator<MaxTemperatureEntiry> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(","); //设置输出分隔符
        lineAggregator.setFieldExtractor(fieldExtractor); //设置FieldExtractor处理器

        FlatFileItemWriter<MaxTemperatureEntiry> fileWriter = new FlatFileItemWriter<>();
        fileWriter.setLineAggregator(lineAggregator);
        fileWriter.setResource(new FileSystemResource("src/main/resources/out-data.csv")); //设置输出文件位置
        fileWriter.setName("outpufData");

        if (useBuilder) {//使用builder方式创建
            fileWriter = new FlatFileItemWriterBuilder<MaxTemperatureEntiry>().name("outpufData")
                .resource(new FileSystemResource("src/main/resources/out-data.csv")).lineAggregator(lineAggregator)
                .build();
        }
        return fileWriter;
    }
}

文件的写入过程与读取过程完全对称相反:先用FieldExtractor将对象转换为一个collection结构(列表),然后用lineAggregatorcollection转化为带分隔符的字符串。

代码说明

  • 代码中的测试数据来自数据分析交流项目bi-process-example,是NOAA的2015年全球天气监控数据。为了便于源码存储进行了大量的删减,原始数据有百万条,如有需要使用下列方式下载: curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2015.csv.gz #数据文件 curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt # 文件结构及类型说明
  • 代码实现了读取文件、处理文件、写入文件的整个过程。处理文件的过程是只获取监控的最高温度信息(Type=TMAX),其他都过滤。
  • 本案例的代码使用org.chenkui.spring.batch.sample.flatfile.FlatFileItemApplication::main方法运行,使用的是Command Runner的方式执行(运行方式的说明见Item概念及使用代码命令行方式运行Java内嵌运行)。

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spring Batch 批处理(4) - ItemReader
2.在这个接口中只有一个方法read(),它读取一个数据并且移动到下一个数据上去,在读取结束时必须返回一个null,否则表明数据没有读取完毕;
chenchenchen
2020/05/26
1.1K0
Spring Batch 批处理(5) - ItemWriter
对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入
chenchenchen
2020/05/26
1.1K0
Spring Batch 配置方式XML
在使用XML配置时,我们需要创建一个XML文件,并在其中定义Job、Step和其他组件的配置信息。下面是一个使用XML配置的示例:
堕落飞鸟
2023/04/16
1.1K0
Spring batch教程 之 配置Step「建议收藏」
正如在Batch Domain Language中叙述的,Step是一个独立封装域对象,包含了所有定义和控制实际处理信息批任务的序列。这是一个比较抽象的描述,因为任意一个Step的内容都是开发者自己编写的Job。一个Step的简单或复杂取决于开发者的意愿。一个简单的Step也许是从本地文件读取数据存入数据库,写很少或基本无需写代码。一个复杂的Step也许有复杂的业务规则(取决于所实现的方式),并作为整个个流程的一部分。
全栈程序员站长
2022/09/02
4.3K0
Spring Batch实战(三)
今天这篇文章,我们来了解一下SpringBatch的ItemReaders、ItemWriters、ItemStream以及怎么注册一个Step。前一篇文章我分析了一下怎么去从database中load数据使用ItemReader的一个子类JdbcPageQueryProvider,今天就进一步分析一下读取数据库数据源时的两个关键类ItemReader和ItemStream,以及写入数据库时的ItemWriter。
xdd
2022/07/12
1.5K0
Spring Batch实战(三)
Spring Batch 批处理(7) - 异常处理及容错机制
1.对于chunk类型的Step,spring batch为我们提供了用于管理它的状态
chenchenchen
2020/05/26
1.7K0
Spring Batch 核心概念ItemReader
Spring Batch是一个用于大规模批处理的开源框架,它提供了一套完整的工具来帮助开发人员实现高效的批处理任务。其中一个核心概念就是ItemReader,它用于读取数据并将其转换成Java对象,以便在批处理任务中进行处理。
堕落飞鸟
2023/04/16
1.1K0
SpringBoot:使用Spring Batch实现批处理任务
在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。
E绵绵
2024/07/03
1.1K0
Spring Batch:文件的批量读写Flatfile(XML,CSV,TXT)
继杨小强童鞋的《Spring Batch入门篇》之后,继续为大家分享第二篇关于Spring Batch的系列教程。 更多内容请持续关注:spring4all.com,更多spring技术干货与交流学习期待您的参与! Spring Batch:文件的批量读写Flatfile(XML,CSV,TXT) ⏩ 该系列课程中的示例代码使用springBatch 版本为3.0.7;讲解可能会讲一些4.0.X的特性 示例代码地址:https://git.oschina.net/huicode/sp
程序猿DD
2018/02/01
4.1K0
Spring Batch:文件的批量读写Flatfile(XML,CSV,TXT)
SpringBatch文档
Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
全栈程序员站长
2022/09/01
5.5K0
批处理框架 Spring Batch 这么强,你真的会用吗?
概念词就不多说了,我简单地介绍下 , spring batch 是一个 方便使用的 较健全的 批处理 框架。
用户1220090
2025/05/08
3290
批处理框架 Spring Batch 这么强,你真的会用吗?
Spring Batch入门篇
Spring Batch,一个很多人还觉得陌生的框架,它是Spring Cloud Task的基础,主要用来实现批量任务的处理。该框架在国内的使用非常少,所以一直以来对于该框架在中文资料也一直都非常欠缺。 因此,在这里向大家推荐一位愿意将与我们分享Spring Batch技术细节的开源爱好者,也是我们spring4all.com社区的Spring Batch专题版主:杨小强童鞋! 下面我们就跟着他的系列文章一步步的了解Spring Batch的技术细节。 简介 SpringBatch 是一个大数据量的并行处
程序猿DD
2018/02/01
1.2K0
Spring Batch入门篇
Spring Batch 核心概念Step示例
首先,我们需要创建一个用来存储数据的表,这里我们创建一个名为“person”的表,包含id、name和age三个字段:
堕落飞鸟
2023/04/16
3210
Spring Batch 教程简单教程
在企业应用中,批处理很常见。但随着数据在互联网上变得越来越普遍,我们如何处理这些数据也变得很重要。有多种解决方案可用。Apache Storm或Apache Spark有助于以所需格式处理和转换数据。在这篇文章中,我们将更仔细地研究 Spring Batch。
用户4235284
2023/10/14
1.1K0
Spring Batch 教程简单教程
Spring Batch 配置方式-Java配置
除了XML配置外,Spring Batch还支持使用Java代码进行配置。Java配置可以使配置文件更加简洁和易于维护。下面是一个使用Java配置的示例:
堕落飞鸟
2023/04/16
5350
SpringBoot~SpringBatch 使用
Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,Spring Batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。
用户2196435
2018/08/10
1K0
batch spring 重复执行_Spring Batch批处理
批处理顾名思义是批量处理大量数据,但是这个大量数据又不是特别大的大数据,比Hadoop等要轻量得多,适合企业单位人数薪资计算,财务系统月底一次性结算等常规数据批量处理。
全栈程序员站长
2022/09/02
1.9K0
Spring Cloud Task 集成Spring Cloud Task Batch(一)
Spring Cloud Task和Spring Batch都是Spring生态系统中强大的工具。Spring Batch提供了一个框架,用于编写和执行大规模批处理作业,而Spring Cloud Task提供了一种机制,可以将短期的任务作为单独的执行单元来运行。这两个工具在不同的场景下都非常有用,因此将它们结合起来可以提供更广泛的应用程序开发和部署选择。
堕落飞鸟
2023/04/17
1.1K0
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
Spring Batch是一个基于Java的开源批处理框架,用于处理大规模、重复性和高可靠性的任务。它提供了一种简单而强大的方式来处理批处理作业,如数据导入/导出、报表生成、批量处理等。
苏泽
2024/03/10
2.1K0
【译】Spring 官方教程:创建批处理服务
原文:Creating a Batch Service 译者:Mr.lzc 校对:lexburner 本指南将引导你完成创建基本的批处理驱动解决方案的过程。 你将构建什么 你将构建一个从CSV电子表格导入数据的服务,并使用自定义代码进行转换,并将最终结果存储在数据库中。 你需要准备什么 大约15分钟 一个自己喜欢的文本编辑器或者IDE JDK 1.8 或以上版本 Gradle 2.3+ 或者 Maven 3.0+ 你也可以直接将代码导入到本地的IDE中: Spring Tool Suite (STS) In
程序猿DD
2018/03/26
2.9K0
相关推荐
Spring Batch 批处理(4) - ItemReader
更多 >
交个朋友
加入腾讯云技术交流站
洞悉AI新动向 Get大咖技术交流群
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验