# Kafka_深入探秘者(8):kafka 高级应用

Kafka_深入探秘者(8):kafka 高级应用

一、kafka 消费组管理

1、kafka 命令行工具

参考官网: http://kafka.apache.org/22/documentation.html

2、kafka 消费组管理:查看消费组


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --list

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --list

# 查看消费组命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

3、kafka 消费组管理:查看消费组详情


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo

# 查看消费组详情命令(group.demo 是你自己创建的需要查看的组名)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo

4、kafka 消费组管理:查看消费组当前状态


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --state

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --state

# 查看消费组当前状态
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --state

在这里插入图片描述

5、kafka 消费组管理:查看消费组内成员信息


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --members

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --members

# 查看消费组内成员信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --members

6、kafka 消费组管理:删除消费组,如果有消费者在使用,则会删除失败。


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110 --delete --group group.demo

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --delete --group group.demo

# 删除消费组,如果有消费者在使用,则会删除失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group.demo

7、kafka 消费位移管理:重置消费位移,如果有消费者在使用,则会重置失败。

1)kafka 重置消费位移命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

2)命令参数说明:
–all-topics :指定了所有主题。
–reset-offsets :重置消费。
–to-earliest :移到最后
–execute :开始执行命令


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/

# 重置消费位移,如果有消费者在使用,则会重置失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

在这里插入图片描述

二、kafka 数据管道 Connect 文件系统

1、kafka 数据管道 Connect 文件系统 概述

kafka数据管道Connect文件系统概述.png

1)Kafka 是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用 Producer 来发送消息到 Broker,并使用 Consumer 来消费 Broker 中的消息。Kafka Connect 是到 0.9 版本才提供的并极大的简化了其他系统与 Kafka 的集成。

2)Kafka Connect 运用用户快速定义并实现各种 Connector(File,dbc,Hdfs等),这些功能让大批量数据导入/导出 Kafka 很方便。

3)在 Kafka Connect 中还有两个重要的概念: Task 和 Worker。

4)Connect 中一些概念:

  • 连接器: 实现了 ConnectAPI,决定需要运行多少个任务,按照任务来进行数据复制,从 work 进程获取任务配置并将其传递下去。

  • 任务: 负责将数据移入或移出 Kafka。

  • work 进程: 相当与 connector 和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务,提供 RESTAPI。

  • 转换器: kafka connect 和其他存储系统直接发送或者接受数据之间转换数据。

2、kafka 独立模式-文件系统

场景
以下示例使用到了两个 Connector,将文件 source.txt 中的内容通过 Source 连接器写入 Kafka 主题中,然后将内容写入 srouce.sink.txt 中。

  • FileStreamSource: 从 source.txt 中读取并发布到 Broker 中。
  • FileStreamSink: 从 Broker 中读取数据并写入到 source.sink.txt 文件中。

3、步骤详情

3.1、首先我们来看下 Worker 进程用到的配置文件

${(KAFKA_HOME}/config/connect-standalone.properties

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/

// Kafka 集群连接的地址(把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128))
# bootstrap.servers=localhost:9092
bootstrap.servers=172.18.30.110:9092

# 打开并编辑 config/connect-standalone.properties 文件
vim config/connect-standalone.properties

# 修改以下几项:

# 启动端口,注意不能冲突
rest.port=8084

// 格式转化类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

// json消息中是否包含schema
key.converter.schemas.enable=true
value.converter.schemas.enable.true

// 保存偏移量的文件路径
offset.storage.file.filename=/tmp/connect.offsets

// 设定提交偏移量的频率
offset.flush.interval.ms=10808
3.2 其中的 Source 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-source.properties

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 打开并编辑 config/connect-file-source.properties 文件
vim config/connect-file-source.properties

# 修改以下几项:

//配置连接器的名称
name.local-file-source

//连接器的全限定名称,设置类名称也是可以的
connector.class=FileStreamSource

// task数量
tasks .max=1

//数据源的文件路径(修改为你自己创建source.txt文件实际路径)
file=/tmp/source.txt

// 主题名称(修改为你自己创建的主题,如:heima)
# topic=topic0703
topic=heima
3.3 其中的 Sink 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-sink.properties
# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 打开并编辑 config/connect-file-sink.properties 文件
vim config/connect-file-sink.properties

# 修改以下几项:

name=local-file-sink
connectér.class=FileStreamsink
tasks.max=1

# 修改为你自己创建sink.txt文件实际路径
#file=/tmp/source.sink.txt
file=/tmp/sink.txt

# 修改为你自己创建的主题,如:heima
#topics=topice703
topics=heima
3.4 启动 source 连接器

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

3.5 启动 slink 连接器

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties 

3.6 source 写入文本信息

# 写入消息到 source.txt
echo "hello kafka,coming">> /tmp/source.txt 

# 查询是否写入成功
less /tmp/source.txt 

# 同时查询sink.txt 是否有内容传输过来
cat /tmp/sink.txt 

三、kafka springboot+kafka 001

1、打开 idea 创建 artifactId 名为 kafka_spring_learn 的 maven 工程。


	--> idea --> File 
	--> New --> Project 
	--> Maven 
		Project SDK: ( 1.8(java version "1.8.0_131" ) 
	--> Next 
	--> Groupld : ( djh.it )
		Artifactld : ( kafka_spring_learn )
		Version : 1.0-SNAPSHOT
	--> Name: ( kafka_spring_learn )
		Location: ( ...\kafka_spring_learn\ )	
	--> Finish
	

2、在 kafka_spring_learn 工程的 pom.xml 文件中导入依赖坐标。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>djh.it</groupId>
    <artifactId>kafka_spring_learn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
<!--        <relativePath></relativePath>-->
    </parent>

    <properties>
        <java.version>8</java.version>
        <!--        <scala.version>2.11</scala.version>-->
        <scala.version>2.12</scala.version>
        <slf4j.version>1.7.21</slf4j.version>
        <!--        <kafka.version>2.0.0</kafka.version>-->
        <kafka.version>2.8.0</kafka.version>
        <lombok.version>1.18.8</lombok.version>
        <junit.version>4.11</junit.version>
        <gson.version>2.2.4</gson.version>
        <protobuff.version>1.5.4</protobuff.version>
        <!--        <spark.version>2.3.1</spark.version>-->
        <spark.version>2.4.8</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.version}</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>${gson.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>${protobuff.version}</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>${protobuff.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.4</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.9.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3、在 kafka_spring_learn 工程,创建 application.properties 配置文件。


# kafka_spring_learn\src\main\resources\application.properties

logging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092

spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

4、在 kafka_spring_learn 工程,创建 KafkaLearnApplication.java 启动类文件。


/**
 *  kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java
 *
 *  2024-6-24 创建 KafkaLearnApplication.java 启动类文件
 */
package djh.it.kafkalearn;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
public class KafkaLearnApplication {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaLearnApplication.class, args);
    }
    @RequestMapping("/index")
    public String index(){

        return "hello,kafka spring comming";
    }

    @Autowired
    private KafkaTemplate template;
    private static final String topic = "heima";

    // 消息生产者
    @GetMapping("/send/{input}")
    public String sendToKafka(@PathVariable String input){
        this.template.send(topic, input);

        return "Send success!  " + input;
    }

    //消息的接收
    @KafkaListener(id ="", topics = topic, groupId = "group.demo")
    public void listener(String input){
        LOGGER.info("message input value:{}", input);
    }
}

5、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/index
输出: hello,kafka spring comming

2)浏览器地址栏输入: localhost:8080/send/kafka
输出: Send success! kafka

在这里插入图片描述

四、kafka springboot+kafka 事务 001

1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。


# kafka_spring_learn\src\main\resources\application.properties

logging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092

spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

# 事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.

2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。


/**
 *  kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java
 *
 *  2024-6-24 创建 KafkaLearnApplication.java 启动类文件
 */
package djh.it.kafkalearn;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
@Transactional
public class KafkaLearnApplication {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaLearnApplication.class, args);
    }
    @RequestMapping("/index")
    public String index(){

        return "hello,kafka spring comming";
    }

    @Autowired
    private KafkaTemplate template;
    private static final String topic = "heima";

    // 消息生产者
    @GetMapping("/send/{input}")
    public String sendToKafka(@PathVariable String input){
        this.template.send(topic, input);

        //事务的支持
        template.executeInTransaction(t ->{
            t.send(topic, input);
            if("error".equals(input)){
                throw new RuntimeException("input is error");
            }
            t.send(topic, input+" anthor");
            return true;
        });

        return "Send success!  " + input;
    }

    //消息的接收
    @KafkaListener(id ="", topics = topic, groupId = "group.demo")
    public void listener(String input){
        LOGGER.info("message input value:{}", input);
    }
}

3、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/send/kafka
输出: Send success! kafka

2)浏览器地址栏输入: localhost:8080/send/error
会抛出异常。

在这里插入图片描述

五、kafka springboot+kafka 事务 002

1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。

# kafka_spring_learn\src\main\resources\application.properties

logging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092

spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

# 事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.

2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。

/**
 *  kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java
 *
 *  2024-6-24 创建 KafkaLearnApplication.java 启动类文件
 */
package djh.it.kafkalearn;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
@Transactional
public class KafkaLearnApplication {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaLearnApplication.class, args);
    }
    @RequestMapping("/index")
    public String index(){

        return "hello,kafka spring comming";
    }

    @Autowired
    private KafkaTemplate template;
    private static final String topic = "heima";


    // 消息生产者
    @GetMapping("/send/{input}")
    public String sendToKafka(@PathVariable String input){
        this.template.send(topic, input);

        //事务的支持
        template.executeInTransaction(t ->{
            t.send(topic, input);
            if("error".equals(input)){
                throw new RuntimeException("input is error");
            }
            t.send(topic, input+" anthor");
            return true;
        });

        return "Send success!  " + input;
    }

    // 消息生产者2: 演示事务
    @GetMapping("/send2/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String sendToKafkaTransaction(@PathVariable String input){

        //事务的支持
        template.send(topic, input);
        if("error".equals(input)){
            throw new RuntimeException("input is error2!");
        }
        template.send(topic, input + " anthor2");

        return "Send success2!  " + input;
    }

    //消息的接收
    @KafkaListener(id ="", topics = topic, groupId = "group.demo")
    public void listener(String input){
        LOGGER.info("message input value:{}", input);
    }
}

3、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/send2/kafka
输出: Send success2! kafka

2)浏览器地址栏输入: localhost:8080/send2/error
会抛出异常。服务器上重新打开一个客户消费,也收不到消息。

在这里插入图片描述

在这里插入图片描述

4、消息中间件选型对比:

4.1 资料文档

Kafka: 中。有 kafka 作者自己写的书,网上资料也有一些,
rabbitmq: 多。有一些不错的书,网上资料多。
zeromq: 少。没有专门写 zeromg 的书,网上的资料多是一些代码的实现和简单介绍。
rocketmq: 少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。
activemq: 多。没有专门写activemq的书,网上资料多

4.2 开发语言

Kafka: Scala
rabbitmg: Erlang
zeromg:c
rocketmg: java
activemg: java

4.3 支持的协议

Kafka: 自己定义的一套.【基于TCP)
rabbitmg: AMQPI
zeromq: TCP、UDP
rocketmq: 自己定义的一套.
activemq: OpenWire、STOMP、REST、XMPP、AMQP

4.4 消息存储

Kafka: 内存、磁盘、数据库。支持大量堆积。
rocketmq: 磁盘。支持大量堆积。
rabbitmq: 内存、磁盘、支持少量规程。
activemq: 内存、磁盘、数据库。支持少量堆积。
zeromq: 消息发送端的内存或者磁盘中。不支持持久化。

4.5 消息事务

Kafka: 支持
rabbitmq: 支持。客户端将信道设置为事务模式只有当消息被 rabbitMq 接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降
zeromq: 不支持
rocketmq: 支持
activemq: 支持。

4.6 负载均衡

Kafka: 支持负载均衡。
rabbitmq: 对负载均衡支持不好。
zeromq: 去中心化,不支持负载均衡。本身只是一个多线程网络库。
rocketmq: 支持负载均衡。
activemq: 支持负载均衡。可以基于 zookeeper 实现负载均衡。

4.7 集群方式:

Kafka: 天然的 Leader-Slave 无状态集群。
rabbitmq: 支持简单集群,复制模式,对高级集群模式支持不好。
zeromq: 去中心化,不支持集群。
rocketmq: 常用,多对 Master-Slave 模式。开源版本需手动切换 Slave 模式变成 Master 模式。
activemq: 。支持简单集群模式,比如 主-备,对高级集群模式支持不好。

4.8 管理界面

Kafka: 一般
rabbitmq: 好
zeromq: 无
rocketmq: 无
activemq: 一般

4.9 可用性

Kafka: 非常高(分布式)
rabbitmg: 高(主从)。
zeromg: 高。
rocketmq: 非常高(分布式)
activemq: 高(主从)

4.10 消息重复

Kafka:支持at least once、at most once
rabbitmg:支持at least once、at most once
zeromq:只有重传机制,但是没有持久化,消息丢了重传也没有用。既不是atleastonce、也不是at most
once、更不是exactly only once
rocketmq:支持atleast once
activemq:支持atleast once

4.11 吞吐量 TPS

Kafka: 极大
Kafka 按批次发送消,息和消费消,息。发送端将多个小消,息合并,批量发向 Broker,消费端每次取出一个批次的消,息批量外理。

六、kafka spark+kafka

1、流式处理 Spark

  • Spark 最初诞生于美国加州大学伯克利分校(UCBerkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。

  • 2013年,Spark 加入 Apache 孵化器项目后,开始获得迅猛的发展,如今已成为 Apache 软件基金会最重要的三大分布式计算系统开源项目之一(即 Hadoop、Spark、Storm)。

  • Spark 最初的设计目标是使数据分析更快–不仅运行速度快,也要能快速、容易地编写程序。

  • 为了使程序运行更快,Spark 提供了内存计算,减少了迭代计算时的I0开销;

  • 而为了使编写程序更为容易,Spark 使用简练、优雅的 Scala 语言编写,基于 Scala 提供了交互式的编程体验。

  • 虽然,Hadoop 已成为大数据的事实标准,但其 MapReduce 分布式计算模型仍存在诸多缺陷,

  • 而 Spark 不仅具备 Hadoop MapReduce 所具有的优点,且解决了 Hadoop MapReduce 的缺陷。

  • Spark 正以其结构一体化、功能多元化的优势逐渐成为当今大数据领域最热门的大数据计算平台。

流式处理Spark.png

2、Spark 安装与应用

Spark 官网下载:

http://spark.apache.org/downloads.html
https://spark.apache.org/downloads.html
https://archive.apache.org/dist/spark/

3、下载完成安装包,解压即安装


# 创建目录并切换目录:
mkdir /usr/local/spark/
cd /usr/local/spark/

# 上传安装包到服务器
sftp> put 


# 解压即安装
tar -zxvf apark-2.4.4-bin-hadoop2.7

# 配置 JDK 
vim sbin/spark-config.sh 

# 你的 jdk 安装路径(java 默认安装路径:/usr/lib/jvm/java-8-openjdk-amd64/ )
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64



# 切换到 spark 的安装目录
cd /usr/local/spark/apark-2.4.4-bin-hadoop2.7/

# 启动 spark 
sbin/start-all.sh starting

# 验证
jps -l2819 kafka.Kafka

4、浏览器地址栏输入: http://localhost:8080

http://127.0.0.1:8080
http://172.18.30.110:8080

5、Spark 和 Kafka 整合:

5.1 在 kafka_spring_learn 工程中,创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java
/**
 *  kafka_spring_learn\src\main\java\djh\it\kafkalearn\spark\SparkStreamingFromKafka.java
 *
 *  2024-6-26 创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java
 */
package djh.it.kafkalearn.spark;

import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class SparkStreamingFromKafka {
    @SneakyThrows
    public static void main( String[] args ) {
        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(("SparkStreamingFromKafka"));
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "172.18.30.110:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "sparkStreaming");

        Collection<String> topics = Arrays.asList("heima");

        JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topics, kafkaParams));

        JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call( ConsumerRecord<String, String> consumerRecord ) throws Exception {


                return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
            }
        });
        javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
            @Override
            public void call( JavaPairRDD<String, String> javaPairRDD ) throws Exception {
                javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
                    @Override
                    public void call( Tuple2<String, String> tuple2 ) throws Exception {
                        System.out.println(tuple2._2);
                    }
                });
            }
        });
        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

5.2、在服务器端打开一个生产消息发送服务,进行消息发送测试。

# 切换到 kafka 安装目录下:
cd /usr/local/kafka/kafka_2.12-2.8.0

# 打开生产消息服务:
bin/kafka-console-producer.sh --broker-list 172.18.30.110:9092 --topic heima

# 开始发送消息:
>hello nihao
>to^H^H
>producer broker kafka
>


# 在 idea 启动类控制台这边就会收到 消息,这就是 Spark 流式处理。

Spark流式处理.png

上一节关联链接请点击
# Kafka_深入探秘者(7):kafka 稳定性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/749625.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WIFI7技术之多重资源单元(Multi-RU)技术

什么是WIFI7及Multi-RU技术&#xff1f; 随着无线通信技术的不断进步&#xff0c;WiFi技术也在不断演变。WIFI7&#xff0c;即IEEE 802.11be&#xff0c;是下一代无线网络标准。它旨在提供更高的带宽、更低的延迟和更好的用户体验。WIFI7引入了许多新特性&#xff0c;其中之一…

PostgreSQL 连接器:在 SeaTunnel 中的应用与优势

在现代企业中&#xff0c;数据已经成为核心资产&#xff0c;基于开源数据集成平台SeaTunnel&#xff0c;工程师如何高效地连接和管理这些数据源&#xff0c;直接关系到企业的竞争力和运营效率。 本文将给大家介绍如何通过 JDBC PostgreSQL 数据源连接器&#xff0c;在 SeaTunne…

使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中&#xff0c;实时数据处理变得越来越重要。Flink CDC&#xff08;Change Data Capture&#xff09;是一种强大的工具&#xff0c;可以帮助我们实时捕获数据库的变更&#xff0c;并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据&#xff0…

Docker部署常见应用之Oracle数据库

文章目录 安装部署参考文章 安装部署 使用Docker安装Oracle数据库是一个相对简便的过程&#xff0c;可以避免在本地环境中直接安装Oracle数据库的复杂性。 安装Docker环境&#xff1a;确保你的系统上已经安装了Docker&#xff0c;并且Docker服务正在运行。具体的安装方法可以根…

TikTok网页版使用指南:如何登录TikTok网页版?

海外版抖音TikTok&#xff0c;已成为连接全球观众的重要平台。据统计&#xff0c;在美国&#xff0c;TikTok的用户数量已达到近1.3亿&#xff0c;并且在国外的95后用户群体中很受欢迎。 TikTok网页版也提供了一个广阔的平台&#xff0c;让品牌和创作者在电脑端与全球观众互动&…

数据结构与算法基础(王卓)--学习笔记

1 数据结构分类 1.1 逻辑结构分类 集合结构线性结构&#xff1a;线性表、栈、队列、串树形结构图形结构 1.2 物理结构分类 逻辑结构在计算机中的真正表示方式&#xff08;又称为映射&#xff09;称为物理结构&#xff0c;也可叫做存储结构 顺序存储结构&#xff1a;数组链…

【Unity】Excel配置工具

1、功能介绍 通过Excel表配置表数据&#xff0c;一键生成对应Excel配置表的数据结构类、数据容器类、已经二进制数据文件&#xff0c;加载二进制数据文件获取所有表数据 需要使用Excel读取的dll包 2、关键代码 2.1 ExcelTool类 实现一键生成Excel配置表的数据结构类、数据…

Centos7源码方式安装sqle及开发相关

官方文档-源码安装 操作系统&#xff1a;centos:7.9,everything (DVD版应该也可以) (在ubuntu22.04装了两天之后乖乖开了一个新Centos7虚拟机) 镜像&#xff1a;清华大学开源软件镜像站 centos/7.9.2009 安装git sudo yum update -y sudo yum install -y git git --version安…

Sonia索尼娅:填补心理健康护理缺口的创新人工智能治疗师应用APP

聊天机器人可以取代人类治疗师吗&#xff1f;一些初创公司和患者声称他们可以。但这并不是完全确定的科学。 一项引人注目的研究发现&#xff0c;高达80%的使用OpenAI的ChatGPT寻求心理健康建议的人认为&#xff0c;这项技术可作为传统治疗的理想替代方案。与此同时&#xff0…

Android高级面试_2_IPC相关

Android 高级面试-3&#xff1a;语言相关 1、Java 相关 1.1 缓存相关 问题&#xff1a;LruCache 的原理&#xff1f; 问题&#xff1a;DiskLruCache 的原理&#xff1f; LruCache 用来实现基于内存的缓存&#xff0c;LRU 就是最近最少使用的意思&#xff0c;LruCache 基于L…

国外8年联培访学迎来逆袭|国家最高科学技术奖薛其坤成长史

国家最高科技奖花落薛其坤&#xff0c;他是该奖项史上最年轻得主。在追踪其成长史的过程中&#xff0c;知识人网小编注意到&#xff1a;薛其坤的学习研究开局并不顺利&#xff0c;直至到日本做联合培养博士研究生&#xff0c;他才真正迎来了自己学术生涯的重要转折点。后来到美…

面试相关-接口测试常问的问题

1.为什么要做接口测试 (1)现在大多系统都是前后端分离的项目,前端和后端的进度可能不一样,那为了尽早的进入测试,前端界面没有开发完成的情况下,只要后端的接口开发完了,就可以提前做接口测试了; (2)基于安全考虑,只依赖前端进行限制,已经完全不满足系统的安全性…

ELK日志集成

https://www.bilibili.com/video/BV1x94y1674x/?buvidXY705117E90F73A790429C9CFBD5F70F22168&vd_source939ea718db29535a3847d861e5fe37ef

Aigtek:为何要使用电压放大器

电压放大器在现代电子技术中起到了至关重要的作用。它是一种电子设备&#xff0c;用于将输入信号的电压增大到所需的输出电压水平。电压放大器的使用有以下几个方面的原因和优势。 电压放大器可以提高信号的强度和质量。许多实际应用中的输入信号往往很微弱&#xff0c;比如来自…

“管式加热炉简单控制系统和串级控制系统设计与Matlab仿真”,高分资源,匠心制作,下载可用。强烈推荐!!!

“管式加热炉简单控制系统和串级控制系统设计与Matlab仿真”毕业设计&#xff0c;高分资源&#xff0c;匠心制作&#xff0c;下载可用。强烈推荐&#xff01;&#xff01;&#xff01; 1.控制目标 加热炉的任务是把原油加热到一定温度&#xff0c;以保证下道工艺的顺利进行。…

windows安装mysql8.0.35保姆级教程

一、下载mysql安装包 点击mysql安装包下载链接&#xff1a;https://downloads.mysql.com/archives/community/ 选择window版本&#xff0c;点击下载按钮&#xff0c;如下所示&#xff1a; 二、解压安装包并新建my.ini文件 将下面内容复制到新建的my.ini文件里面 [mysqld] #…

阿里云oss存储

文章目录 准备阿里云的OSS控制台创建bucket获取AccessKey java使用oss导入依赖官网demo修改参数运行demo代码 封装工具类Oss下载如何保证指定时间段内可以访问私有权限的图片文件&#xff1f; 准备阿里云的OSS 控制台 访问阿里云官网&#xff0c;登录以后&#xff0c;右上角有…

大众点评根据关键词搜索采集店铺信息

大众点评根据关键词搜索采集店铺信息&#xff0c;包括店铺名称、大中小分类、省市区划分、人均价格、评价数量、团购数量、全部团购名称、全部团购链接&#xff08;团购信息还可解析出每个团购的价格&#xff09; ​​​

【代码安全】如何通过实现代码加密与魔改Python,防止代码泄露、恶意窃取

如何通过实现代码加密与魔改Python&#xff0c;防止代码泄露、恶意窃取 文章目录 如何通过实现代码加密与魔改Python&#xff0c;防止代码泄露、恶意窃取前言概述代码运行演示Step 0: 正常代码运行Step 1: 代码加密Step 2: 加密代码在魔改环境运行Step 3: 加密代码在正常环境运…

matlab编辑稀疏单位方阵

创建 10001000 稀疏单位方阵&#xff0c;并查看稀疏模式。 &#xff08;1&#xff09; I speye(1000); spy(I)&#xff08;2&#xff09; S speye(400,800); spy(S)此命令等同于 speye([400 800])。