什么是Apache ZooKeeper?
Apache ZooKeeper是由集群(节点组)使用的一种服务,用于在自身之间协调,并通过稳健的同步技术维护共享数据。ZooKeeper本身是一个分布式应用程序,为写入分布式应用程序提供服务。
ZooKeeper提供的常见服务如下 :
- 命名服务 - 按名称标识集群中的节点。它类似于DNS,但仅对于节点。
- 配置管理 - 加入节点的最近的和最新的系统配置信息。
- 集群管理 - 实时地在集群和节点状态中加入/离开节点。
- 选举算法 - 选举一个节点作为协调目的的leader。
- 锁定和同步服务 - 在修改数据的同时锁定数据。此机制可帮助你在连接其他分布式应用程序(如Apache HBase)时进行自动故障恢复。
- 高度可靠的数据注册表 - 即使在一个或几个节点关闭时也可以获得数据。
分布式应用程序提供了很多好处,但它们也抛出了一些复杂和难以解决的挑战。ZooKeeper框架提供了一个完整的机制来克服所有的挑战。竞争条件和死锁使用故障安全同步方法进行处理。另一个主要缺点是数据的不一致性,ZooKeeper使用原子性解析。
ZooKeeper的好处
以下是使用ZooKeeper的好处:
- 简单的分布式协调过程
- 同步 - 服务器进程之间的相互排斥和协作。此过程有助于Apache HBase进行配置管理。
- 有序的消息
- 序列化 - 根据特定规则对数据进行编码。确保应用程序运行一致。这种方法可以在MapReduce中用来协调队列以执行运行的线程。
- 可靠性
- 原子性 - 数据转移完全成功或完全失败,但没有事务是部分的。
Java环境
最新版zookeeper需要Java1.8.211以上。
比如利用源安装
yum install java-1.8.0-openjdk.x86_64
java -version
openjdk version "1.8.0_232"
ZooKeeper
解压缩,进入conf目录修改zoo-sample.cfg文件为zoo.cfg文件。
打开文件,修改路径,类似如下
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
保存,到bin目录下执行
[root@VM_0_6_centos bin]# ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
开始服务后,启动cli端。
[root@VM_0_6_centos bin]# ./zkCli.sh
/usr/bin/java
Connecting to localhost:2181
创建Znode节点
create /path /data
eg: [zk: localhost:2181(CONNECTED) 4] create /datapp zookapp
Created /datapp
创建顺序节点
create -e /path /data
eg: [zk: localhost:2181(CONNECTED) 5] create -s /datapp zookapp
Created /datapp0000000003
删除节点
[zk: localhost:2181(CONNECTED) 6] delete /datapp
[zk: localhost:2181(CONNECTED) 7] ls /datapp
Node does not exist: /datapp
#删除带子节点的节点
[zk: localhost:2181(CONNECTED) 30] ls /datapp
[sec]
[zk: localhost:2181(CONNECTED) 33] deleteall /datapp
[zk: localhost:2181(CONNECTED) 34]
获取数据
[zk: localhost:2181(CONNECTED) 11] get /datapp
zookapp
[zk: localhost:2181(CONNECTED) 13] get -s /datapp #查看全部数据
zookapp
cZxid = 0xc
ctime = Thu Nov 07 11:10:41 CST 2019
mZxid = 0xc
mtime = Thu Nov 07 11:10:41 CST 2019
pZxid = 0xc
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
[zk: localhost:2181(CONNECTED) 18] set /datapp zookupdate #设置更改
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/datapp
[zk: localhost:2181(CONNECTED) 19] get -w /datapp #查看更改
zookupdate
[zk: localhost:2181(CONNECTED) 20] set /datapp cversion=1
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/datapp
[zk: localhost:2181(CONNECTED) 21] get -w /datapp
cversion=1
设置数据
[zk: localhost:2181(CONNECTED) 18] set /datapp zookupdate
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/datapp
[zk: localhost:2181(CONNECTED) 19] get -w /datapp
zookupdate
创建子节点
[zk: localhost:2181(CONNECTED) 28] create /datapp zookapp
Created /datapp
[zk: localhost:2181(CONNECTED) 29] create /datapp/sec zookapp2
Created /datapp/sec
[zk: localhost:2181(CONNECTED) 30] ls /datapp
[sec]
[zk: localhost:2181(CONNECTED) 31] get /datapp
zookapp
[zk: localhost:2181(CONNECTED) 32] get /datapp/sec
zookapp2
Zookeeper集群部署
此处利用一台主机模拟两个zookeeper部署,在root目录下创建两个文件夹, 一个为zookeeper1,一个为zookeeper2。而实际环境中最小需要三个节点,最好是奇数节点。
创建data目录,在conf配置文件中修改如下,端口由于是在一台主机上,所以需要不同的端口,正常形式下,可以设置为不同IP同端口。
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zookeeper1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=127.0.0.1:7788:8890
server.2=127.0.0.1:7789:8891
创建data/myid文件
创建文件myid,写入配置文件中的server名。文件的存在位置跟配置文件中的dataDir一致。
[root@VM_0_6_centos zookeeper1]# echo "1" > myid
[root@VM_0_6_centos zookeeper1]# ls
myid
分别启动后查看状态可以发现,已经选举了一个为leader
[root@VM_0_6_centos zookeeper2]# ./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/zookeeper2/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost.
Mode: leader
[root@VM_0_6_centos zookeeper1]# ./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/zookeeper1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
在leader端修改数据,新建znode节点
[zk: localhost:2181(CONNECTED) 0] create /datapp idsign
Created /datapp
[zk: localhost:2181(CONNECTED) 1]
follow端
[zk: localhost:2181(CONNECTED) 0] get /datapp
idsign
[zk: localhost:2181(CONNECTED) 1]
follow端修改数据,leader端同步到数据,向其他follow写入数据
[zk: localhost:2181(CONNECTED) 1] set /datapp idsign=xxxx
[zk: localhost:2181(CONNECTED) 0] get /datapp
idsign=xxxx
[zk: localhost:2181(CONNECTED) 1]
ZooKeeper API
官方提供了绑定Java和C的API,此处不做使用解释。以下以python的API使用为例。
Java版文档:http://zookeeper.apache.org/doc/r3.3.3/api/org/apache/zookeeper/ZooKeeper.html
Java示例:https://blog.csdn.net/u013468915/article/details/80878490
python版文档:https://kazoo.readthedocs.io/en/latest/install.html
pip install kazoo #安装kazoo
连接到ZooKeeper
from kazoo.client import KazooClient
zk = KazooClient(hosts='106.54.181.187:2181') #连接
zk.start()
节点状态
from kazoo.client import KazooState
def my_listener(state):
if state == KazooState.LOST:
print('ZooKeeper Lost') # 状态为丢失时
elif state == KazooState.SUSPENDED:
print('ZooKeeper Suspended') # 状态为暂停
else:
print('ZooKeeper Connect') # 状态为连接中
zk.add_listener(my_listener)
Zookeeper 3.4及更高版本支持只读模式。 必须为Zookeeper集群中的服务器打开此模式,客户端才能使用它。 要将这种模式与Kazoo一起使用,应在read_only选项设置为True的情况下调用KazooClient。 这将使客户端连接到已变为只读的Zookeeper节点,并且客户端将继续扫描其他可读写的节点。
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.client import KeeperState
zk = KazooClient(hosts='127.0.0.1:2181', read_only=True)
zk.start()
@zk.add_listener
def watch_for_ro(state):
if state == KazooState.CONNECTED:
if zk.client_state == KeeperState.CONNECTED_RO:
print("Read only mode!")
else:
print("Read/Write mode!")
创建节点,使用的方法有ensure_path()
和create()
# 确定路径,在必要时创建
zk.ensure_path("/my/favorite")
# 用数据创建一个节点
zk.create("/my/favorite/node", b"a value")
读取节点信息
exists() 检查节点是否存在。
get() 在ZnodeStat结构中获取节点的数据以及详细的节点信息。
get_children() 获取给定节点的子级列表
eg:
if zk.exists("/my/favorite"): #判断/my/favorite是否存在
data, stat = zk.get("/my/favorite") #查看节点信息
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
children = zk.get_children("/my/favorite") #查看节点的子节点信息
print("There are %s children with names %s" % (len(children), children))
更新节点信息
zk.set("/my/favorite", b"some data")
删除节点信息
zk.delete("/my/favorite/node", recursive=True)
重试命令
result = zk.retry(zk.get, "/path/to/node")
相当于执行
zk.get('/path/to/node')
利用如上信息,编写操作ZooKeeper的Python脚本
#coding:utf-8
from kazoo.client import KazooClient
class zoocon:
def __init__(self):
self.zk = KazooClient(hosts='106.54.181.187:2181') #连接
self.zk.start()
self.path = '/datapp'
def get_data(self):
res = self.zk.get(self.path)
print(res)
def exist(self):
self.zk.exists(self.path)
return True
def create_data(self):
self.zk.create(self.path, b'signapp')
def close(self):
self.zk.close()
def delete(self):
self.zk.delete(self.path, recursive=True)
if __name__ == '__main__':
zks = zoocon()
if zks.exist():
zks.delete()
zks.close()
print 'path delete'
else:
zks.create_data()
zks.get_data()
zks.close()
服务端存在datapp节点,运行后删除节点,当在服务器查看时,显示如下
[zk: localhost:2181(CONNECTED) 0] get /datapp
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /datapp