返回

使用 Apache Kafka 进行流处理

发布时间:2023-07-22 21:18:55 309
# java# json# git# github# 信息

使用 Apache Kafka 进行流处理_Cloud

在本指南中,我们开发了三个使用 Spring Cloud Stream 的 Spring Boot 应用程序,并将它们部署到 Cloud Foundry、Kubernetes 和本地机器。 在另一个指南中,我们将使用数据流部署这些应用程序。 通过手动部署应用程序,可以更好地了解数据流自动执行的步骤。

以下各节介绍如何从头开始构建这些应用程序。

如果您愿意,可以下载包含这些应用程序源代码的 zip 文件,解压缩它,构建它,然后继续部署部分。

您可以从浏览器中下载包含所有三个应用程序的项目。您还可以使用命令行,如以下示例所示:

复制

wget 'https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true' -O usage-cost-stream-sample.zip

生成下载的示例

流应用程序可以配置为使用通用代码库与 Kafka 代理或 RabbitMQ 一起运行。唯一的区别在于可执行的jar文件。为了使它们与 Kafka 代理一起工作,它们需要一个 Kafka 绑定器依赖项(默认情况下启用)。对于 RabbitMQ,他们需要 Rabbit 粘合剂。

若要从根项目目录生成 Kafka 的示例流应用,请执行以下操作:

$./mvnw clean package -Pkafka

发展

我们创建了三个 Spring Cloud Stream 应用程序,这些应用程序使用配置的绑定器进行通信。

场景是一家手机公司为其客户创建账单。 用户发出的每个调用在调用期间都有一个 和 使用量。 作为生成帐单过程的一部分,需要将原始呼叫数据转换为呼叫持续时间的成本和使用的数据量的成本。durationdata通过使用包含调用和调用期间使用的量的类对调用进行建模。 帐单是使用包含调用成本 () 和数据成本 () 的类建模的。每个类都包含一个 ID () 来标识进行呼叫的人员。UsageDetaildurationdataUsageCostDetailcostCallcostDatauserId

三个流式处理应用程序如下所示:

  • 应用程序(命名)生成用户的调用和每个调用的调用量,并发送包含对象作为 JSON 的消息。

SourceUsageDetailSenderdurationdatauserIdUsageDetail

  • 应用程序(命名 )使用 并计算调用的成本和每个 .它将对象作为 JSON 发送。

ProcessorUsageCostProcessorUsageDetailuserIdUsageCostDetail

  • 应用程序(名为 )使用对象并记录调用和数据的成本。

SinkUsageCostLoggerUsageCostDetail

在此步骤中,我们将创建源。​​UsageDetailSender​

您可以直接下载Spring Initialzr生成的项目。

或访问Spring Initialzr网站并按照以下说明进行操作:

  1. 创建一个新的 Maven 项目,其组名为 ,工件名称为 ,和包。

io.spring.dataflow.sampleusage-detail-sender-kafkaio.spring.dataflow.sample.usagedetailsender

  1. “依赖项”文本框中,键入以选择 Kafka 绑定程序依赖项。

Kafka

  1. 在“依赖项”文本框中,键入以选择“春云流”依赖项

Cloud Stream

  1. 在“依赖关系”文本框中,键入以选择 Spring 引导执行器依存关系。

Actuator

  1. 单击生成项目按钮。

现在,您应该将文件并将项目导入到您喜欢的 IDE 中。unzipusage-detail-sender-kafka.zip

在使用 Kafka 作为消息代理时,您可以选择扩展或覆盖许多配置选项,以实现所需的运行时行为。在使用 Kafka 作为消息代理时,您可以选择扩展或覆盖许多配置选项,以实现所需的运行时行为。Kafka 绑定程序文档列出了 Kafka 绑定程序配置属性。

业务逻辑

现在我们可以创建此应用程序所需的代码。为此:

  1. 在包中创建一个类似于 UsageDetail.java 中内容的类。 该类包含 、 和属性。

UsageDetailio.spring.dataflow.sample.usagedetailsenderUsageDetailuserIddataduration

  1. 在包中创建类。它应类似于以下列表:

UsageDetailSender​io.spring.dataflow.sample.usagedetailsender​

package io.spring.dataflow.sample.usagedetailsender;

import java.util.Random;
import java.util.function.Supplier;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageDetailSender {

private String[] users = {"user1", "user2", "user3", "user4", "user5"};

@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}

供应商提供了一个填充有随机值的对象。春云流自动绑定此功能,将数据发送到配置的输出站。默认情况下,Spring Cloud Stream还为每秒调用该函数的任何供应商配置了一个默认轮询器。sendEventsUsageDetail

配置

在配置应用程序时,我们需要设置绑定目标(RabbitMQ 交易所或 Kafka 主题的名称),生产者将在其中发布数据。sourceoutput为方便起见,我们将函数输出绑定名称别名,指示输出对应于函数的第一个输出参数,将逻辑名称命名为 。或者,我们可以直接绑定输出绑定名称:。有关更详细的说明,请参阅​功能绑定名称。sendEvents-out-0sendEventsoutputspring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail在 中添加以下属性:​​src/main/resources/application.properties​

spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

建筑

现在,我们可以构建使用情况详细信息发送程序应用程序。

在根目录中,使用以下命令使用 maven 生成项目:​​usage-detail-sender​

./mvnw clean package

测试

Spring Cloud Stream 提供了一个测试 jar 来测试 Spring Cloud Stream 应用程序:

<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-streamartifactId>
<type>test-jartype>
<classifier>test-binderclassifier>
<scope>testscope>
dependency>

而不是消息代理绑定程序实现,提供内存中绑定程序实现,用于跟踪和测试应用程序的出站和入站消息。 测试配置包括用于发送和接收消息的 bean。 若要对应用程序进行单元测试,请在类中添加以下代码:TestChannelBinderConfigurationInputDestinationOutputDestinationUsageDetailSender​UsageDetailSenderApplicationTests​

package io.spring.dataflow.sample.usagedetailsender;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageDetailSenderApplicationTests {

@Test
public void contextLoads() {
}

@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {

OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-detail");

MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);

assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}
  • 测试用例验证应用程序是否成功启动。

contextLoads

  • 测试用例使用 来接收和验证 发送的消息。

testUsageDetailSenderOutputDestinationUsageDetailSender

内存中测试绑定程序的行为与任何消息代理绑定程序实现完全相同。值得注意的是,在 Spring Cloud Stream 应用程序中,消息有效负载始终是一个字节数组,默认情况下编码为 JSON。在这种情况下,使用应用程序在其输入通道上接收字节,并根据内容类型自动委托给相应的 ,以转换字节以匹配使用函数的参数类型。对于测试,我们需要显式执行此步骤。或者,我们可以直接调用 JSON 解析器,而不是使用 ,而不是使用 。​MessageConverter​​​​UsageDetail​​​​MessageConverter​

处理器

在此步骤中,我们将创建处理器。​​UsageCostProcessor​

您可以直接下载Spring Initialzr生成的项目。

或访问Spring Initialzr网站并按照以下说明进行操作:

  1. 创建一个新的 Maven 项目,其组名为 ,工件名称为 ,和包。​​io.spring.dataflow.sample​​​​usage-cost-processor-kafka​​​​io.spring.dataflow.sample.usagecostprocessor​
  2. “依赖项”文本框中,键入以选择 Kafka 绑定程序依赖项。​​Kafka​
  3. 在“依赖项”文本框中,键入以选择“春云流”依赖项。​​Cloud Stream​
  4. 在“依赖关系”文本框中,键入以选择 Spring 引导执行器依存关系。​​Actuator​
  5. 单击生成项目按钮。

现在,您应该将文件并将项目导入到您喜欢的 IDE 中。​​unzip​​​​usage-cost-processor-kafka.zip​

业务逻辑

现在我们可以创建此应用程序所需的代码。为此:

  1. 在 中创建类。其内容类似于 UsageDetails.java 的内容。 该类包含 、 和 属性​​UsageDetail​​​​io.spring.dataflow.sample.usagecostprocessor​​​​UsageDetail​​​​userId​​​​data​​​​duration​
  2. 在包中创建类。其内容类似于 UsageCostDetail.java 的内容。 该类包含 、 和属性。​​UsageCostDetail​​​​io.spring.dataflow.sample.usagecostprocessor​​​​UsageCostDetail​​​​userId​​​​callCost​​​​dataCost​
  3. 在包中创建类,该类接收消息,计算调用和数据开销,并发送消息。以下清单显示了源代码:​​UsageCostProcessor​​​​io.spring.dataflow.sample.usagecostprocessor​​​​UsageDetail​​​​UsageCostDetail​
package io.spring.dataflow.sample.usagecostprocessor;

import java.util.function.Function;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostProcessor {

private double ratePerSecond = 0.1;

private double ratePerMB = 0.05;

@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}

在前面的应用程序中,我们声明了一个接受 a 并返回 .Spring Cloud Stream 将发现此功能,并将其输入和输出绑定到为消息传递中间件配置的目标。如上一节所述,Spring Cloud Stream 使用适当的 来执行必要的类型转换。​​Function​​​​UsageDetail​​​​UsageCostDetail​​​​MessageConverter​

配置

配置应用程序时,我们需要设置以下属性:​​processor​

  • 订阅此应用程序的绑定目标(Kafka 主题或 RabbitMQ 交换)。​​input​
  • 生成者将在其中发布数据的绑定目标。​​output​

对于生产应用程序,最好设置为指定此使用者应用程序所属的使用者组。这可确保其他使用者应用程序(每个应用程序都使用自己的组 ID 标识)将收到每条消息。每个使用者组可以扩展到多个实例以分配工作负载。Spring Cloud Stream 抽象了 Kafka 原生的这个特性,以便将其扩展到 RabbitMQ 和其他绑定器实现。​​spring.cloud.stream.bindings.input.group​

在 中添加以下属性:​​src/main/resources/application.properties​

spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

为方便起见,我们分别为函数绑定名称以及 to 和 提供别名。​​processUsageCost-in-0​​​​processUsageCost-out-0​​​​input​​​​output​

  • 该属性将对象绑定到目标。​​spring.cloud.stream.bindings.input.destination​​​​UsageCostProcessor​​​​input​​​​usage-detail​
  • 该属性将对象的输出绑定到目标。​​spring.cloud.stream.bindings.output.destination​​​​UsageCostProcessor​​​​usage-cost​

输入目标必须与源应用程序的输出目标相同。同样,输出目标必须与下面的接收器的输入目标相同。

建筑

现在,我们可以构建使用成本处理器应用程序。 在 riit 目录中,使用以下命令使用 maven 构建项目:​​usage-cost-processor​

./mvnw clean package

测试

如上所述,Spring Cloud Stream 提供了一个测试 jar 来测试 Spring Cloud Stream 应用程序:

<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-streamartifactId>
<type>test-jartype>
<classifier>test-binderclassifier>
<scope>testscope>
dependency>

​TestChannelBinderConfiguration​​提供内存中绑定程序实现,用于跟踪和测试应用程序的出站和入站消息。 测试配置包括用于发送和接收消息的 bean。 若要对应用程序进行单元测试,请在类中添加以下代码:​​InputDestination​​​​OutputDestination​​​​UsageCostProcessor​​​​UsageCostProcessorApplicationTests​

package io.spring.dataflow.sample.usagecostprocessor;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageCostProcessorApplicationTests {

@Test
public void contextLoads() {
}

@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {

InputDestination source = context.getBean(InputDestination.class);

UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);

MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message message = converter.toMessage(usageDetail, messageHeaders);

source.send(message);

OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-cost");

UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);

assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}
  • 测试用例验证应用程序是否成功启动。​​contextLoads​
  • 测试用例使用 发送消息以及接收和验证消息。​​testUsageCostProcessor​​​​InputDestination​​​​OutputDestination​

在此步骤中,我们将创建接收器。​​UsageCostLogger​

您可以直接下载由Spring Initialzr生成的项目,然后单击“生成项目”。

或访问Spring Initialzr网站并按照以下说明进行操作:

  1. 创建一个新的 Maven 项目,其组名为 ,工件名称为 ,和包。​​io.spring.dataflow.sample​​​​usage-cost-logger-kafka​​​​io.spring.dataflow.sample.usagecostlogger​
  2. 在“依赖项”文本框中,键入以选择 Kafka 绑定程序依赖项。​​Kafka​
  3. 在“依赖项”文本框中,键入以选择“春云流”依赖项。​​Cloud Stream​
  4. 在“依赖关系”文本框中,键入以选择 Spring 引导执行器依存关系。​​Actuator​
  5. 单击“生成项目”。

现在,您应该将文件并将项目导入到您喜欢的 IDE 中。​​unzip​​​​usage-cost-logger-kafka.zip​

业务逻辑

要创建业务逻辑,请执行以下操作:

  1. 在包中创建一个类。其内容应类似于 UsageCostDetail.java 的内容。 该类包含 、 和属性。​​UsageCostDetail​​​​io.spring.dataflow.sample.usagecostlogger​​​​UsageCostDetail​​​​userId​​​​callCost​​​​dataCost​
  2. 在包中创建类,该类接收消息并记录它。以下清单显示了源代码:​​UsageCostLogger​​​​io.spring.dataflow.sample.usagecostlogger​​​​UsageCostDetail​
package io.spring.dataflow.sample.usagecostlogger;

import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostLogger {

private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);

@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}

在前面的应用程序中,我们声明了一个接受 .Spring Cloud Stream 将发现此功能并将其输入绑定到为消息传递中间件配置的输入目标。如上一节所述,Spring Cloud Stream 在调用此消费者之前使用适当的 来执行必要的类型转换。​​Consumer​​​​UsageCostDetail​​​​MessageConverter​

配置

配置应用程序时,我们需要设置:​​sink​

  • 订阅此应用程序的绑定目标(Kafka 主题或 RabbitMQ 交换)。​​input​
  • (可选)指定此使用者应用程序所属的使用者组。​​group​

为方便起见,我们将函数绑定名称别名为 .​​process-in-0​​​​input​

在 中添加以下属性:​​src/main/resources/application.properties​

spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

建筑

现在我们可以构建使用成本记录器应用程序。 在 的根目录中,使用以下命令使用 Maven 生成项目:​​usage-cost-logger​

./mvnw clean package

测试

若要对 进行单元测试,请在类中添加以下代码:​​UsageCostLogger​​​​UsageCostLoggerApplicationTests​

package io.spring.dataflow.sample.usagecostlogger;

import java.util.HashMap;
import java.util.Map;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {

@Test
public void contextLoads() {
}

@Test
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {

InputDestination source = context.getBean(InputDestination.class);

UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);

MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message message = converter.toMessage(usageCostDetail, messageHeaders);

source.send(message);

Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}

在 中添加依赖项:​​awaitility​​​​pom.xml​

<dependency>
<groupId>org.awaitilitygroupId>
<artifactId>awaitilityartifactId>
<scope>testscope>
dependency>
  • 测试用例验证应用程序是否成功启动。​​contextLoads​
  • 测试用例验证是否使用Spring Boot的测试框架调用了该方法。​​testUsageCostLogger​​​​process​​​​UsageCostLogger​​​​OutputCaptureExtension​

部署

下一步是使用您为这些应用程序配置的消息代理将这些应用程序部署到受支持的平台之一。

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
精选文章
thumb 中国研究员首次曝光美国国安局顶级后门—“方程式组织”
thumb 俄乌线上战争,网络攻击弥漫着数字硝烟
thumb 从网络安全角度了解俄罗斯入侵乌克兰的相关事件时间线
下一篇
Docker网络上篇-网络介绍 2023-07-22 17:54:25