Spring Integration对Redis的支持
Spring Integration 2.1引入了对Redis的支持:“一个开源的高级键值存储”。 这种支持以基于 Redis 以及发布-订阅消息传递适配器的形式出现,Redis 通过其 PUBLISH、SUBSCRIBE和 UNSUBSCRIBE命令支持这些适配器。MessageStore
您需要将此依赖项包含在项目中:
您还需要包含 Redis 客户端依赖项,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。 Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:和 。 这些抽象简化了与多个 Redis 客户端 Java API 的集成。 目前,Spring Data Redis 支持 Jedis 和 Lettuce。ConnectionFactory
Template
用RedisConnectionFactory
要连接到 Redis,您可以使用接口的实现之一。 以下清单显示了接口定义:RedisConnectionFactory
以下示例显示了如何在 Java 中创建:LettuceConnectionFactory
下面的示例展示了如何在 Spring 的 XML 配置中创建:LettuceConnectionFactory
的实现提供了一组属性,例如端口和主机,您可以根据需要设置这些属性。 拥有 的实例后,可以创建 的实例并将其注入 .RedisConnectionFactory
RedisConnectionFactory
RedisTemplate
RedisConnectionFactory
用RedisTemplate
与 Spring 中的其他模板类(例如 和 )一样,它是一个简化 Redis 数据访问代码的辅助类。 有关及其变体的更多信息(例如),请参阅 Spring Data Redis 文档。JdbcTemplate
JmsTemplate
RedisTemplate
RedisTemplate
StringRedisTemplate
以下示例演示如何在 Java 中创建 的实例:RedisTemplate
下面的示例展示了如何在 Spring 的 XML 配置中创建 的实例:RedisTemplate
使用 Redis 发送消息
如简介中所述,Redis 通过其 、 和命令提供对发布-订阅消息传递的支持。 与JMS和AMQP一样,Spring Integration提供了消息通道和适配器,用于通过Redis发送和接收消息。PUBLISH
SUBSCRIBE
UNSUBSCRIBE
瑞迪斯发布/订阅频道
与 JMS 类似,在某些情况下,生产者和使用者都打算成为同一应用程序的一部分,在同一进程中运行。 可以使用一对入站和出站通道适配器来实现此目的。 但是,与Spring Integration的JMS支持一样,有一种更简单的方法来解决这个用例。 您可以创建发布-订阅通道,如以下示例所示:
A 的行为很像 Spring 集成主命名空间中的普通元素。 它可以由任何终结点的 和属性引用。 不同之处在于,此通道由 Redis 主题名称支持:属性指定的值。 但是,与 JMS 不同的是,本主题不必提前创建,甚至不必由 Redis 自动创建。 在 Redis 中,主题是扮演地址角色的简单值。 生成者和使用者可以使用与其主题名称相同的值进行通信。 对此通道的简单订阅意味着可以在生产和消费终结点之间实现异步发布-订阅消息传递。 但是,与通过在简单的 Spring 集成元素中添加元素创建的异步消息通道不同,消息不会存储在内存中队列中。 相反,这些消息通过 Redis 传递,这使您可以依赖它对持久性和集群的支持以及与其他非 Java 平台的互操作性。publish-subscribe-channel
input-channel
output-channel
String
topic-name
String
String
Redis 入站通道适配器
Redis 入站通道适配器 () 以与其他入站适配器相同的方式将传入的 Redis 消息调整为 Spring 消息。 它接收特定于平台的消息(在本例中为 Redis),并使用策略将它们转换为 Spring 消息。 以下示例演示如何配置 Redis 入站通道适配器:RedisInboundChannelAdapter
MessageConverter
前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。 请注意,前面的配置依赖于熟悉的 Spring 范式,即自动发现某些 bean。 在这种情况下,将隐式注入适配器。 可以改用属性显式指定它。redisConnectionFactory
connection-factory
另请注意,上述配置为适配器注入了自定义 . 该方法类似于 JMS,其中实例用于在 Redis 消息和 Spring 集成消息有效负载之间进行转换。 默认值为 .MessageConverter
MessageConverter
SimpleMessageConverter
入站适配器可以订阅多个主题名称,因此属性中的值集以逗号分隔。topics
从 V3.0 开始,除了现有属性之外,入站适配器现在还具有该属性。 此属性包含一组以逗号分隔的 Redis 主题模式。 有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅。topics
topic-patterns
入站适配器可以使用 对 Redis 消息正文进行反序列化。 的属性可以设置为空字符串,这将生成属性的值。 在这种情况下,Redis 消息的原始正文作为消息有效负载提供。RedisSerializer
serializer
null
RedisSerializer
byte[]
从 V5.0 开始,您可以使用 的属性向入站适配器提供实例。 此外,收到的 Spring 集成消息现在具有指示已发布消息来源的标头:主题或模式。 您可以将此下游用于路由逻辑。Executor
task-executor
RedisHeaders.MESSAGE_SOURCE
Redis 出站通道适配器
Redis 出站通道适配器将传出的 Spring 集成消息调整为 Redis 消息,方式与其他出站适配器相同。 它接收 Spring 集成消息,并使用策略将它们转换为特定于平台的消息(在本例中为 Redis)。 以下示例演示如何配置 Redis 出站通道适配器:MessageConverter
该配置与 Redis 入站通道适配器并行。 适配器隐式注入 ,该 定义为其 Bean 名称。 此示例还包括可选(和自定义)(bean)。RedisConnectionFactory
redisConnectionFactory
MessageConverter
testConverter
从 Spring Integration 3.0 开始,提供了该属性的替代方法:您可以使用该属性在运行时确定消息的 Redis 主题。 这些属性是互斥的。
topic
topic-expression
Redis 队列入站通道适配器
Spring Integration 3.0引入了一个队列入站通道适配器,用于从Redis列表中“弹出”消息。 默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。 适配器是消息驱动的。 它使用内部侦听器线程,不使用轮询器。
以下清单显示了 的所有可用属性:queue-inbound-channel-adapter
组件 Bean 名称。 如果未提供该属性,则会在应用程序上下文中创建并注册 ,并将此属性作为 Bean 名称。 在这种情况下,端点本身使用 Bean 名称加 . (如果 Bean 名称为 ,则端点注册为 。 |
要从此终端节点向其发送实例的实例。 |
一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 默认为 . |
一个属性,用于指定启动此终结点的阶段。 默认为 . |
对 Bean 的引用。 默认为 . |
执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。 |
从终端节点的侦听任务收到异常时向其发送实例的实例。 默认情况下,基础使用应用程序上下文中的默认值。 |
豆引用。 它可以是一个空字符串,这意味着“没有序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 . |
“pop”操作等待队列中的 Redis 消息的超时(以毫秒为单位)。 默认值为 1 秒。 |
侦听器任务在重新启动侦听器任务之前,在“pop”操作出现异常后应休眠的时间(以毫秒为单位)。 |
指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 其默认值为 。 |
对 Spring (或标准 JDK 1.5+) bean 的引用。 它用于基础侦听任务。 它默认为 . |
指定此端点应使用“右弹出”(当)还是“左弹出”(当)从 Redis 列表中读取消息。 如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。 将其设置为与通过“右推”写入列表的软件一起使用,或实现类似堆栈的消息顺序。 其默认值为 。 从版本 4.3 开始。 |
必须配置多个线程进行处理;否则,当 在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况下。 有关可能的实现,请参阅 Spring 框架参考手册。 |
Redis 队列出站通道适配器
Spring Integration 3.0引入了一个队列出站通道适配器,用于从Spring Integration消息“推送”到Redis列表。 默认情况下,它使用“左推”,但您可以将其配置为使用“右推”。 以下清单显示了 Redis 的所有可用属性:queue-outbound-channel-adapter
|
组件 Bean 名称。 如果未提供该属性,则会在应用程序上下文中创建并注册 ,并将此属性作为 Bean 名称。 在这种情况下,端点注册的 Bean 名称为 加 。 (如果 Bean 名称为 ,则端点注册为 。 |
|
此终结点从中接收实例的 。 |
|
对 Bean 的引用。 默认为 . |
|
执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。 此属性与 互斥。 |
|
用于确定 Redis 列表名称的 SpEL。 它使用运行时的传入作为变量。 此属性与 互斥。 |
|
豆类引用。 它默认为 . 但是,对于有效负载,如果未提供引用,则使用 a。 |
|
指定此终端节点是应仅将有效负载发送还是将整个负载发送到 Redis 队列。 默认为 . |
|
指定此端点应使用“左推”(当)还是“右推”(当)将消息写入 Redis 列表。 如果为 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。 将其设置为与以“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。 默认为 . 从版本 4.3 开始。 |
瑞迪斯应用程序事件
从 Spring Integration 3.0 开始,Redis 模块提供了 的实现,而 又是一个 . 封装了 Redis 操作的异常(端点是事件的“源”)。 例如,在捕获操作中的异常后发出这些事件。 例外可以是任何泛型或 . 使用 处理这些事件对于确定后台 Redis 任务的问题和采取管理操作非常有用。IntegrationEvent
org.springframework.context.ApplicationEvent
RedisExceptionEvent
BoundListOperations.rightPop
org.springframework.data.redis.RedisSystemException
org.springframework.data.redis.RedisConnectionFailureException
红与红消息存储
如企业集成模式 (EIP) 一书中所述,邮件存储允许您保留消息。 当考虑可靠性时,这在处理具有缓冲消息功能的组件(聚合器、重新排序器等)时非常有用。 在 Spring 集成中,该策略还为声明检查模式提供了基础,EIP 中也有描述。MessageStore
Spring 集成的 Redis 模块提供了 . 以下示例演示如何将其与聚合器一起使用:RedisMessageStore
前面的示例是一个 Bean 配置,它需要 a 作为构造函数参数。RedisConnectionFactory
缺省情况下,使用 Java 序列化来序列化消息。 但是,如果要使用其他序列化技术(如 JSON),可以通过设置 .RedisMessageStore
valueSerializer
RedisMessageStore
从版本 4.3.10 开始,框架分别为实例和实例提供了 Jackson 序列化程序和解序列化程序实现 — 和 。 必须使用的选项配置它们。 此外,还应设置为每个序列化的复杂对象添加类型信息(如果您信任源)。 然后在反序列化期间使用该类型信息。 该框架提供了一个名为 的实用程序方法,该方法已随前面提到的所有属性和序列化程序一起提供。 此实用程序方法附带一个参数,用于限制 Java 包进行反序列化以避免安全漏洞。 默认的受信任软件包:、、。 若要在 中管理 JSON 序列化,必须以类似于以下示例的方式对其进行配置:Message
MessageHeaders
MessageJacksonDeserializer
MessageHeadersJacksonSerializer
SimpleModule
ObjectMapper
enableDefaultTyping
ObjectMapper
JacksonJsonUtils.messagingAwareMapper()
trustedPackages
java.util
java.lang
org.springframework.messaging.support
org.springframework.integration.support
org.springframework.integration.message
org.springframework.integration.store
RedisMessageStore