Kafka 简介
详情:http://kafka.apachecn.org/intro.html
Kafka 使用
简介这种网上很多的,此处不介绍了,可以查看以上ApacheCN的文档,就直接开始使用和python之间的整合。
下载当前的最新版本:
wget http://archive.apache.org/dist/kafka/2.4.0/kafka_2.13-2.4.0.tgz
此处没有搭建zookeeper,就直接使用kafka的便捷脚本创建节点
./bin/zookeeper-server-start.sh config/zookeeper.properties
如果出现错误Unrecognized VM option 'PrintGCDateStamps'
,可能是Java版本和Kafka版本之间的问题,尝试使用更高版本的Kafka或者其他版本的Java。
启动kafka,先复制一份需要的配置文件
cp config/server.properties config/server-1.properties
配置文件中
broker.id=0 #必须唯一,当前只设置一个,所以暂不更改
listeners=PLAINTEXT://:9092 #listeners是broker监听的地址和端口,多broker的时候需要不重复
log.dirs=/tmp/kafka-logs-1 #日志,此处改为kafka-logs-1
zookeeper.connect=localhost:2181 #zookeeper地址,没更改
启动kafka
./bin/kafka-server-start.sh config/server-1.properties
一串输出后,kafka启动成功
创建一个主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #创建test主题,replication-factor副本数,小于集群服务器数
./bin/kafka-topics.sh --list --zookeeper localhost:2181 #查看当前主题
发送消息
当在生产者发送消息的时候,消费者会显示消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 消费者监听,此时有生产者传入消息会显示
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test #生产者发送消息
多代理集群
为每个代理创建一个配置文件
如上的复制配置文件
cp config/server.properties config/server-1.properties
修改配置文件参数
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
启动新节点
./bin/kafka-server-start.sh config/server-1.properties &
重新创建一个主题,然后通过生产者和消费者来处理消息,处理方式和上一样,只是需要更改不同的topic
导入/导出文件
使用官方提高的三个配置文件,首先是Kafka Connect的配置文件,包含常用的配置,如Kafka brokers连接方式和数据的序列化格式。 其余的配置文件均指定一个要创建的连接器。这些文件包括连接器的唯一名称,类的实例,以及其他连接器所需的配置。
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
一连串输出后,提示没有test.txt文件。
创建test.txt,就可以看到对文件的处理,文件夹下就会生成test.sink.txt。
使用python处理消息
安装包
pip3 install kafka-python
使用文档
https://kafka-python.readthedocs.io/en/master/usage.html
生产者代码:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=[
"localhost:9092"
]
)
future = producer.send("test", b'I am rito yan')
try:
record_metadata = future.get(timeout=10)
print(record_metadata)
except KafkaError as e:
print(e)
发送成功后返回
RecordMetadata(topic='test', partition=0, topic_partition=TopicPartition(topic='test', partition=0), offset=3, timestamp=1578905897675, checksum=None, serialized_key_size=-1, serialized_value_size=13, serialized_header_size=-1)
也可以格式化消息格式
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=[
"localhost:9092"
]
#value_serializer=lambda m: json.dumps(m).encode('ascii') #生产者发送json数据
)
future = producer.send("test", b'I am rito yan')
#future = producer.send('test', {'key': 'value'})
try:
record_metadata = future.get(timeout=10)
print(record_metadata)
except KafkaError as e:
print(e)
消费者代码
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"test",
group_id = "user-test", #群组id,消息只能被同组的一个消费者消费,所以需要定义组名
bootstrap_servers = [
"localhost:9092"
]
#value_deserializer=lambda m: json.loads(m.decode('ascii')) #格式化解析格式
)
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
消费这运行后处于监听状态,当运行如上的生产者代码的时候,界面会显示出生产者的消息
项目中引用
kafka在实现过程中,消费者处于监听状态,但是项目运行时,阻塞性的监听并不可用,可以使用多线程或者其他方式来处理。
def search_area():
prints = PrintThread()
prints.setDaemon(True)
prints.start()
for i in range(100,200):
time.sleep(1)
print(i)
import threading
import time
from kafka import KafkaConsumer
class PrintThread(threading.Thread):
def run(self):
print("start.... %s" %self.getName())
consumer = KafkaConsumer(
"test",
group_id = "user-test", #群组id,消息只能被同组的一个消费者消费,所以需要定义组名
bootstrap_servers = [
"localhost:9092"
]
#value_deserializer=lambda m: json.loads(m.decode('ascii')) #格式化解析格式
)
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
search_area()
达到主线程不阻塞的情况下仍然可以继续监听
或者不采用监听的方式,采用主动拉取队列数据,这样一次拉取的时候可能是较大的数据,对数据量处理要求高的情况下可能会增加消息延迟堆积
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"test",
group_id='user-test',
bootstrap_servers = [
"localhost:9092"
]
)
consumer.subscribe(topics=('test',))
# consumer.subscribe(topics=('test','test0')) #订阅多个主题
msg = consumer.poll(timeout_ms=2000) # 从kafka获取消息
print(msg)
for tp, messages in msg.items():
for message in messages:
print("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
message.offset, message.key,
message.value))
但是这样做会出现多次获取重复已消费的信息,因为自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。单次请求的时候不能提交offset。
添加如下手动提交已消费信息
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"test",
group_id='user-test',
enable_auto_commit = False,
bootstrap_servers = [
"localhost:9092"
]
)
consumer.subscribe(topics=('test',))
# consumer.subscribe(topics=('test','test0')) #订阅多个主题
msg = consumer.poll(timeout_ms=2000) # 从kafka获取消息
print(msg)
for tp, messages in msg.items():
for message in messages:
print("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
message.offset, message.key,
message.value))
consumer.commit() #同步提交,直到正常或异常返回之前阻塞
consumer.commit_async() #异步提交,不阻塞
读取已消费信息
使用 seek方法从指定的partition和offset开始读取数据,需要记录分区和offset。
#encoding:utf8
from kafka import KafkaConsumer, TopicPartition
my_topic = "my.topic" # 指定需要消费的主题
consumer = KafkaConsumer(
bootstrap_servers = "192.168.70.221:19092,192.168.70.222:19092,192.168.70.223:19092", # kafka集群地址
group_id = "my.group", # 消费组id
enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒)
)
consumer.assign([
TopicPartition(topic=my_topic, partition=0),
TopicPartition(topic=my_topic, partition=1),
TopicPartition(topic=my_topic, partition=2)
])
consumer.seek(TopicPartition(topic=my_topic, partition=0), 12) # 指定起始offset为12
consumer.seek(TopicPartition(topic=my_topic, partition=1), 0) # 可以注册多个分区,此分区从第一条消息开始接收
# consumer.seek(TopicPartition(topic=my_topic, partition=2), 32) # 没有注册的分区上的消息不会被消费
for msg in consumer: # 迭代器,等待下一条消息
print msg # 打印消息
部署kafka
配置好需要的参数后
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
就可以看到zookeeper处于监听状态,当然zookeeper这个一般使用集群部署的节点,此处仍然是使用kafka。
./bin/kafka-server-start.sh -daemon config/server-1.properties