今天来介绍目前非常热门的MQ系统,Kafka,用过的都说好。
Apache Kafka是最流行的开源流处理软件,用于大规模收集、处理、存储和分析数据。它以其卓越的性能、低延迟、容错性和高吞吐量而闻名,能够每秒处理数千条消息。Kafka官网
今天我们不讲kafka的基础概念和架构,直接上手安装使用,然后使用java客户端来演示producer和consumer。
准备工作
下载安装
下载地址,根据需要选择版本下载,值得注意的是v3.0以后的版本不在支持java8。本次我下载的版本是v2.8.1。
解压后
cd kafka_2.13-2.8.1
环境准备
本地需要安装JDK8+。
启动zookeeper服务,不过将来kafka不再依赖zk。
bin/zookeeper-server-start.sh config/zookeeper.properties
然后,在开启一个窗口,运行kafka broker服务。
bin/kafka-server-start.sh config/server.properties
Java客户端
kafka支持java客户端,我们创建一个demo来演示producer生产消息,consumer来监听消息并消费。
首先,创建一个Springboot项目。
pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
yaml配置
spring:
application:
name: kafka
kafka:
bootstrap-servers: 127.0.0.1:9092 #可以配置集群
producer: # 生产者配置
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 #16K
buffer-memory: 33554432 #32M
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: myTestGroup # 消费者组
enable-auto-commit: false # 关闭自动提交
auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
producer
我们创建一个topic_test的主题,消息的key= key_test,消息内容为字符串。
代码如下:
@RestController
@RequestMapping("")
public class ProducerController {
private final static String TOPIC_NAME = "topic_test"; //topic的名称
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
String uuid = UUID.randomUUID().toString();
kafkaTemplate.send(TOPIC_NAME, "key_test", "producer message :" + uuid);
}
}
我们调用该接口,生产一条消息。
2022-05-03 22:46:14.034 INFO 41689 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2022-05-03 22:46:14.034 INFO 41689 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2022-05-03 22:46:14.038 INFO 41689 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: goITNeNZSViGy7BYspNHfw
consumer
consumer
用来订阅topic,使用@KafkaListener
注解。
代码如下:
@Component
public class KafkaConsumer {
//kafka的监听器
@KafkaListener(topics = "topic_test", groupId = "myTestGroup")
public void listenMyTestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("消费者收到消息record:" + record);
System.out.println("消费者收到消息value:" + value);
//手动提交offset
ack.acknowledge();
}
}
我们看下日志,确认刚刚生产的消息,被消费了。
消费者收到消息record:ConsumerRecord(topic = topic_test, partition = 0, offset = 1, CreateTime = 1651589174044, serialized key size = 8, serialized value size = 54, headers = RecordHeaders(headers = [], isReadOnly = false), key = key_test, value = producer message :710ac271-acaf-4ea2-af9a-0f7f5e293ad6)
消费者收到消息value:producer message :710ac271-acaf-4ea2-af9a-0f7f5e293ad6
我们在日志里还看到了Producer的配置信息,部分配置是我们自己设置的,其它配置是kafka服务默认配置的。
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
最后
本文介绍了kafka的安装和使用,并写了java客户端的demo,给初学者做个演示。
针对kafka的高级特性和架构我们下回分解。
文档信息
- 本文作者:yindongxu
- 本文链接:https://iceblow.github.io/2022/04/26/Kafka%E5%85%A5%E9%97%A8/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)