Spring Integration 对R2DBC 支持
发布时间:2022-12-16 21:19:53 364 相关标签: # webkit# spring# 数据库# 数据

Spring 集成提供了通道适配器,用于通过 R2DBC 驱动程序对数据库进行反应式访问来接收和发送消息。
您需要将此依赖项包含在项目中:
马文
格拉德尔
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>6.0.0</version>
</dependency>
R2DBC 入站通道适配器
这是一个基于 的可轮询实现,并根据选项生成带有 or 作为从数据库获取的数据的有效负载的消息。 要的查询可以静态提供,也可以基于每次调用时计算的 SpEL 表达式。 作为评估上下文的根对象存在,以允许使用流畅的 API。 默认情况下,此通道适配器将选择中的记录映射到实例中。 它可以定制,提供一个选项,该选项由基于. 这是可选的,用于标记数据库中的读取记录,以便从后续轮询中跳过。 该操作可以与 一起提供,以将值绑定到基于结果中的记录的 中。R2dbcMessageSource
MessageSource
R2dbcEntityOperations
Flux
Mono
expectSingleResult
SELECT
receive()
R2dbcMessageSource.SelectCreator
StatementMapper.SelectSpec
LinkedCaseInsensitiveMap
payloadType
EntityRowMapper
this.r2dbcEntityOperations.getConverter()
updateSql
UPDATE
BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
UPDATE
SELECT
此通道适配器的典型配置可能如下所示:
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
对于 Java DSL,此通道适配器的配置如下所示:
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.channel(MessageChannels.flux())
.get();
}
R2DBC 出站通道适配器
这是使用提供的 在数据库中执行(默认)或查询的实现。 可以静态配置,也可以通过针对请求消息的 SpEL 表达式进行配置。 要执行的查询可以基于 、 和表达式选项,或者(如果未提供)将整个消息有效负载视为要对其执行 SQL 的实体。 该包被注册为 SpEL 评估上下文的导入,以便直接访问用于和查询的流畅 API。 在 和 中使用,并且必须计算为 for 列值对,才能针对请求消息在目标表中执行更改。R2dbcMessageHandler
ReactiveMessageHandler
INSERT
UPDATE
DELETE
R2dbcEntityOperations
R2dbcMessageHandler.Type
tableName
values
criteria
tableName
org.springframework.data.relational.core.mapping.Table
org.springframework.data.relational.core.query
Criteria
UPDATE
DELETE
valuesExpression
INSERT
UPDATE
Map
此通道适配器的典型配置可能如下所示:
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
对于 Java DSL,此通道适配器的配置如下所示:
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))
文章来源: https://blog.51cto.com/u_15326439/5933691
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报