【MQ系列】Kafka入门

2022/04/26 中间件 MQ 共 5228 字,约 15 分钟

今天来介绍目前非常热门的MQ系统,Kafka,用过的都说好。

Apache Kafka是最流行的开源流处理软件,用于大规模收集、处理、存储和分析数据。它以其卓越的性能、低延迟、容错性和高吞吐量而闻名,能够每秒处理数千条消息。Kafka官网

今天我们不讲kafka的基础概念和架构,直接上手安装使用,然后使用java客户端来演示producer和consumer。

准备工作

下载安装

下载地址,根据需要选择版本下载,值得注意的是v3.0以后的版本不在支持java8。本次我下载的版本是v2.8.1。

解压后

cd kafka_2.13-2.8.1

环境准备

本地需要安装JDK8+。

启动zookeeper服务,不过将来kafka不再依赖zk。

bin/zookeeper-server-start.sh config/zookeeper.properties

然后,在开启一个窗口,运行kafka broker服务。

bin/kafka-server-start.sh config/server.properties

Java客户端

kafka支持java客户端,我们创建一个demo来演示producer生产消息,consumer来监听消息并消费。

首先,创建一个Springboot项目。

pom依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.1.RELEASE</version>
</dependency>

yaml配置

spring:
  application:
    name: kafka
  kafka:
    bootstrap-servers: 127.0.0.1:9092 #可以配置集群
    producer: # 生产者配置
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: myTestGroup # 消费者组
      enable-auto-commit: false # 关闭自动提交
      auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

producer

我们创建一个topic_test的主题,消息的key= key_test,消息内容为字符串。

代码如下:

@RestController
@RequestMapping("")
public class ProducerController {

    private final static String TOPIC_NAME = "topic_test"; //topic的名称

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void send() {
        String uuid = UUID.randomUUID().toString();
        kafkaTemplate.send(TOPIC_NAME, "key_test", "producer message :" + uuid);
    }
}

我们调用该接口,生产一条消息。

2022-05-03 22:46:14.034  INFO 41689 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2022-05-03 22:46:14.034  INFO 41689 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2022-05-03 22:46:14.038  INFO 41689 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: goITNeNZSViGy7BYspNHfw

consumer

consumer用来订阅topic,使用@KafkaListener注解。

代码如下:

@Component
public class KafkaConsumer {

    //kafka的监听器
    @KafkaListener(topics = "topic_test", groupId = "myTestGroup")
    public void listenMyTestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();

        System.out.println("消费者收到消息record:" + record);

        System.out.println("消费者收到消息value:" + value);
        //手动提交offset
        ack.acknowledge();
    }
}

我们看下日志,确认刚刚生产的消息,被消费了。

消费者收到消息record:ConsumerRecord(topic = topic_test, partition = 0, offset = 1, CreateTime = 1651589174044, serialized key size = 8, serialized value size = 54, headers = RecordHeaders(headers = [], isReadOnly = false), key = key_test, value = producer message :710ac271-acaf-4ea2-af9a-0f7f5e293ad6)
消费者收到消息value:producer message :710ac271-acaf-4ea2-af9a-0f7f5e293ad6

我们在日志里还看到了Producer的配置信息,部分配置是我们自己设置的,其它配置是kafka服务默认配置的。

	acks = 1
	batch.size = 16384
	bootstrap.servers = [127.0.0.1:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

最后

本文介绍了kafka的安装和使用,并写了java客户端的demo,给初学者做个演示。

针对kafka的高级特性和架构我们下回分解。

文档信息

搜索

    Table of Contents