返回

Spring Batch -常见批处理模式 和准则。

发布时间:2022-12-20 06:14:13 316
# webkit# spring# ios# 数据库# 数据

Spring Batch -常见批处理模式 和准则。_XML

一些批处理作业可以完全由Spring Batch中的现成组件组装。 例如,可以将 和 实现配置为 涵盖广泛的场景。但是,在大多数情况下,自定义代码必须 写。应用程序开发人员的主要 API 入口点是 、 、 和各种侦听器接口。最简单的批次 作业可以使用来自 Spring 批处理的现成输入,但它通常是 处理和编写中存在需要开发人员的自定义关注点的情况 实现 或 .​​ItemReader​​​​ItemWriter​​​​Tasklet​​​​ItemReader​​​​ItemWriter​​​​ItemReader​​​​ItemWriter​​​​ItemProcessor​

在本章中,我们提供了自定义业务逻辑中常见模式的几个示例。 这些示例主要具有侦听器接口。应该注意的是,如果合适,或也可以实现侦听器接口。​​ItemReader​​​​ItemWriter​

记录项目处理和失败

一个常见的用例是需要逐项对步骤中的错误进行特殊处理, 也许记录到特殊通道或将记录插入数据库。一个 面向块(从步骤工厂 Bean 创建)允许用户实现此用途 具有简单 for 错误和 for for 的情况 上的错误。以下代码片段演示了一个侦听器,该侦听器记录了读取 和写入失败:​​Step​​​​ItemReadListener​​​​read​​​​ItemWriteListener​​​​write​

public class ItemFailureLoggerListener extends ItemListenerSupport {

private static Log logger = LogFactory.getLog("item.error");

public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}

public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}

实现此侦听器后,必须使用步骤注册它。

下面的示例演示如何使用 XML 中的步骤注册侦听器:

XML 配置

<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>

以下示例演示如何使用步骤 Java 注册侦听器:

爪哇配置

@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}

如果您的侦听器在方法中执行任何操作,它必须在 将要回滚的事务。如果需要使用事务性 资源(如数据库)在方法内,考虑添加声明性 事务到该方法(有关详细信息,请参阅 Spring 核心参考指南),并给出其 传播属性的值为 。​​onError()​​​​onError()​​​​REQUIRES_NEW​

出于业务原因手动停止作业

Spring Batch 通过接口提供了一种方法,但这是 真正供操作员而不是应用程序员使用。有时,它是 从业务内部停止作业执行更方便或更有意义 逻辑。​​stop()​​​​JobOperator​

最简单的方法是抛出一个(一个既不重试的 无限期也不跳过)。例如,可以使用自定义异常类型,如下所示 在以下示例中:​​RuntimeException​

public class PoisonPillItemProcessor implements ItemProcessor<T, T> {

@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}

阻止步骤执行的另一种简单方法是从 返回,如以下示例所示:​​null​​​​ItemReader​

public class EarlyCompletionItemReader implements ItemReader {

private ItemReader delegate;

public void setDelegate(ItemReader delegate) { ... }

public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}

}

前面的示例实际上依赖于存在默认实现的事实 当项目要成为时发出完整批次信号的策略 处理是 。可以执行更复杂的完成工作政策, 注入到通过 .​​CompletionPolicy​​​​null​​​​Step​​​​SimpleStepFactoryBean​

下面的示例演示如何将完成策略注入到 XML 中的步骤中:

XML 配置


<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

以下示例演示如何将完成策略注入到 Java 中的步骤中:

爪哇配置

@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}

另一种方法是在 中设置一个标志,该标志由框架中的实现在项目处理之间进行检查。要实现这一点 或者,我们需要访问 电流 ,这可以通过 实现 并将其注册到 .以下示例 显示设置标志的侦听器:​​StepExecution​​​​Step​​​​StepExecution​​​​StepListener​​​​Step​

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

private StepExecution stepExecution;

public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}

public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}

}

设置标志后,默认行为是步骤引发 .可以通过 来控制此行为。但是,唯一的选择是抛出或不抛出异常, 所以这总是一个不正常的结局。​​JobInterruptedException​​​​StepInterruptionPolicy​

添加页脚记录

通常,在写入平面文件时,必须将“页脚”记录追加到平面文件的末尾 文件,在所有处理完成后。这可以使用Spring Batch提供的接口来实现。(及其对应项 ) 是 的可选属性,可以添加到项目编写器中。​​FlatFileFooterCallback​​​​FlatFileFooterCallback​​​​FlatFileHeaderCallback​​​​FlatFileItemWriter​

下面的示例演示如何在 XML 中使用 和:​​FlatFileHeaderCallback​​​​FlatFileFooterCallback​

XML 配置

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>

以下示例显示了如何在 Java 中使用 和:​​FlatFileHeaderCallback​​​​FlatFileFooterCallback​

爪哇配置

@Bean
public FlatFileItemWriter itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}

页脚回调接口只有一个方法,当页脚必须 写入,如以下接口定义所示:

public interface FlatFileFooterCallback {

void writeFooter(Writer writer) throws IOException;

}

编写摘要页脚

涉及页脚记录的常见要求是在 输出过程并将此信息附加到文件末尾。此页脚经常 用作文件的摘要或提供校验和。

例如,如果批处理作业正在将记录写入平面文件,并且存在 要求将所有金额中的总金额放在页脚中,然后 可以使用以下实现:​​Trade​​​​Trades​​​​ItemWriter​

public class TradeItemWriter implements ItemWriter,
FlatFileFooterCallback {

private ItemWriter delegate;

private BigDecimal totalAmount = BigDecimal.ZERO;

public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}

delegate.write(items);

// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}

public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}

public void setDelegate(ItemWriter delegate) {...}
}

这将存储一个值,该值随写入的每个项目而增加。处理完最后一个后,框架将调用 ,这会将 放入文件中。请注意,该方法 使用临时变量 ,该变量存储块中金额的总和。这样做是为了确保,如果方法中发生跳过,则 保持不变。只有在方法结束时,一旦我们保证不会引发异常,我们才会更新.​​TradeItemWriter​​​​totalAmount​​​​amount​​​​Trade​​​​Trade​​​​writeFooter​​​​totalAmount​​​​write​​​​chunkTotal​​​​Trade​​​​write​​​​totalAmount​​​​write​​​​totalAmount​

为了调用该方法,(其中 实现 ) 必须连接到 中作为 。​​writeFooter​​​​TradeItemWriter​​​​FlatFileFooterCallback​​​​FlatFileItemWriter​​​​footerCallback​

下面的示例演示如何在 XML 中连接​​TradeItemWriter​

XML 配置


<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>

以下示例显示了如何在 Java 中连接:​​TradeItemWriter​

爪哇配置

@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();

itemWriter.setDelegate(flatFileItemWriter(null));

return itemWriter;
}

@Bean
public FlatFileItemWriter flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}

到目前为止编写的方式仅在以下情况下才能正常工作 不可重新启动。这是因为该类是有状态的(因为它存储 ),但 不会持久保存到数据库中。因此,它 在重新启动时无法检索。为了使此类可重新启动, 该接口应与方法 和 一起实现,如以下示例所示:​​TradeItemWriter​​​​Step​​​​totalAmount​​​​totalAmount​​​​ItemStream​​​​open​​​​update​

public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}

public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}

update 方法将该对象保存到数据库之前将最新版本存储到 。开放方法 从中检索任何现有内容并将其用作 处理的起点,允许在重新启动时拾取,其中 它在上次运行时离开了。​​totalAmount​​​​ExecutionContext​​​​totalAmount​​​​ExecutionContext​​​​TradeItemWriter​​​​Step​

驱动基于查询的项目读取器

在​​关于读取器和写入器的章节​​中,数据库输入使用 讨论了分页。许多数据库供应商(如 DB2)都非常悲观 如果正在读取的表也需要由 在线申请的其他部分。此外,在极其 大型数据集可能会导致某些供应商的数据库出现问题。因此,许多 项目更喜欢使用“驱动查询”方法来读取数据。这种方法有效 通过迭代键,而不是需要返回的整个对象,如 下图说明:

Spring Batch -常见批处理模式 和准则。_批处理_02

图1.驱动查询作业

如您所见,上图中显示的示例使用与以前相同的“FOO”表 在基于游标的示例中使用。但是,不是选择整行,而只是 在 SQL 语句中选择了 ID。因此,而不是返回对象 从 返回 。然后,可以使用此数字来查询 “details”,这是一个完整的对象,如下图所示:​​FOO​​​​read​​​​Integer​​​​Foo​

Spring Batch -常见批处理模式 和准则。_XML_03

图2.驱动查询示例

应该使用 an 来转换从驱动查询中获取的键 变成一个完整的对象。现有的DAO可用于查询基于完整对象 在键上。​​ItemProcessor​​​​Foo​

多行记录

虽然平面文件通常属于每个记录限制为单个记录的情况 行,通常文件可能具有跨多行且具有多个行的记录 格式。以下文件摘录显示了此类安排的示例:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

以“HEA”开头的行和以“FOT”开头的行之间的所有内容都是 被视为一条记录。为了 正确处理这种情况:

  • 不是一次读取一条记录,而是必须读取 多行记录作为一个组,这样就可以原封不动地传递给。ItemReaderItemWriter
  • 每种线类型可能需要以不同的方式进行标记。

因为一条记录跨越多行,因为我们可能不知道有多少行 有,必须小心始终阅读整条记录。为了 这样做,自定义应该实现为 .​​ItemReader​​​​ItemReader​​​​FlatFileItemReader​

下面的示例演示如何在 XML 中实现自定义:​​ItemReader​

XML 配置


<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>

The following example shows how to implement a custom in Java:​​ItemReader​

Java Configuration

@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

itemReader.setDelegate(flatFileItemReader());

return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}

确保每行都正确标记,这对于 固定长度输入,可用于 委托。请参阅 ​​FlatFileItemReader 中的 Reader,以及 作家章节​​了解更多详情。然后,委托读取器使用 a 将每行的 a 传递回包装。​​PatternMatchingCompositeLineTokenizer​​​​FlatFileItemReader​​​​PassThroughFieldSetMapper​​​​FieldSet​​​​ItemReader​

下面的示例演示如何确保在 XML 中正确标记每一行:

XML内容

<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>

以下示例演示如何确保在 Java 中正确标记每一行:

爪哇内容

@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();

Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());

tokenizer.setTokenizers(tokenizers);

return tokenizer;
}

此包装器必须能够识别记录的末尾,以便它可以连续 呼叫其代表,直到到达终点。对于读取的每一行, 包装器应构建要返回的项目。到达页脚后,该项目可以 返回以交付给 和 ,如 以下示例:​​read()​​​​ItemProcessor​​​​ItemWriter​

private FlatFileItemReader
delegate;

public Trade read() throws Exception {
Trade t = null;

for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}

执行系统命令

许多批处理作业要求从批处理作业中调用外部命令。 这样的过程可以由调度程序单独启动,但优点是 有关运行的公共元数据将丢失。此外,多步骤工作也将 还需要拆分为多个工作。

由于需求如此普遍,Spring Batch 提供了一个实现 调用系统命令。​​Tasklet​

下面的示例演示如何在 XML 中调用外部命令:

XML 配置

<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>

以下示例演示如何在 Java 中调用外部命令:

爪哇配置

@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();

tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);

return tasklet;
}

未找到输入时处理步骤完成

在许多批处理方案中,在数据库或文件中找不到要处理的行不是 特殊。简单地认为没有找到工作并以 0 完成 已读项目。Spring 中开箱即用的所有实现 批处理默认使用此方法。如果什么都没有写出来,这可能会导致一些混乱 即使存在输入(如果文件命名错误或类似情况,通常会发生这种情况 问题出现)。因此,应检查元数据本身以确定如何 框架发现要处理的大量工作。但是,如果找不到输入怎么办 被认为是例外?在这种情况下,以编程方式检查元数据中没有项 处理并导致故障是最好的解决方案。因为这是一个常见的用例, Spring Batch 为侦听器提供了正是此功能,如 的类定义:​​Step​​​​ItemReader​​​​NoWorkFoundStepExecutionListener​

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}

}

前面在“afterStep”阶段检查 的属性,以确定是否未读取任何项目。如果那样的话 在这种情况下,返回退出代码,指示应该失败。 否则,返回,这不会影响 的状态。​​StepExecutionListener​​​​readCount​​​​StepExecution​​​​FAILED​​​​Step​​​​null​​​​Step​

将数据传递到将来的步骤

将信息从一个步骤传递到另一个步骤通常很有用。这可以通过 这。问题是有两个:一个在级别,一个在级别。遗骸仅作为 长如步,而酝酿贯穿全.上 另一方面,每次提交 块,而 仅在每个 .​​ExecutionContext​​​​ExecutionContexts​​​​Step​​​​Job​​​​Step​​​​ExecutionContext​​​​Job​​​​ExecutionContext​​​​Job​​​​Step​​​​ExecutionContext​​​​Step​​​​Job​​​​ExecutionContext​​​​Step​

这种分离的结果是,所有数据都必须放在 正在执行时。这样做可确保数据 在运行时正确存储。如果数据存储到 , 那么它在执行期间不会持久化。如果失败,该数据将丢失。​​Step​​​​ExecutionContext​​​​Step​​​​Step​​​​Job​​​​ExecutionContext​​​​Step​​​​Step​

public classpublic class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;

public void write(Chunk<? extends Object> items) throws Exception {
// ...

ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}

@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
精选文章
thumb 中国研究员首次曝光美国国安局顶级后门—“方程式组织”
thumb 俄乌线上战争,网络攻击弥漫着数字硝烟
thumb 从网络安全角度了解俄罗斯入侵乌克兰的相关事件时间线
下一篇
Spring Batch -单元测试 2022-12-20 05:32:58