ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等
Java语言客户端使用zookeeper
下载zookeeper连接工具,方便我们查看zookeeper存的数据。下载地址:
https://pan.baidu.com/s/1UG5_VcYUZUYUkg04QROLYg?pwd=3ych 提取码: 3ych
下载后解压就可以使用了:
使用页面:
Java语言连接z00keeper
首先引入maven 依赖jar包
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
</dependency>
编写Java代码
public class Test001 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//计数器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//zk核心节点+事件通知
//节点路径和界定啊value
/**
* 参数一:连接地址
* 参数二:zk超时时间
* 参数三:事件通知
*/
//1、创建zk链接
ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if(state == Event.KeeperState.SyncConnected){
System.out.println("zk链接成功");
countDownLatch.countDown(); //计数器减 1
}
}
});
//计数器结果必须是为0 才能继续执行
System.out.println("zk正在等待连接");
countDownLatch.await();
System.out.println("开始创建我们的连接");
//2、创建我们的节点
/**
* 参数一:路径名称
* 参数二:节点value
* 参数三:节点权限acl
* 蚕食四:节点类型 临时和永久
*/
String s = zooKeeper.create("/kaico/one", "hello,boy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(s);
zooKeeper.close();
}
}
可以利用连接工具查看操作结果。
zooKeeper类有很多api操作节点,可以创建、删除。
zookeeper Javaapi文档: 点击查看
四种节点类型
第一种:临时节点:会话关闭之后,就自动消失 CreateMode.PERSISTENT_SEQUENTIAL
第二种:临时有序节点 CreateMode.EPHEMERAL
第三种:持久节点:会话关闭之后,持久化到硬盘 CreateMode.PERSISTENT
第四种:持久有序节点 CreateMode.PERSISTENT_SEQUENTIAL
ACL权限
ACL权限模型,实际上就是对树每个节点实现控制.
身份的认证有4种方式:
world:默认方式,相当于全世界都能访问.
auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd来添加当前上下文中的授权用户).
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的.
ip:使用lp地址认证。
代码案例:使用账号密码实现权限控制
1、添加有权限控制的节点数据
public class Test002 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//计数器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
/**
* 参数一:连接地址
* 参数二:zk超时时间
* 参数三:事件通知
*/
//1、创建zk链接
ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if(state == Event.KeeperState.SyncConnected){
System.out.println("zk链接成功");
countDownLatch.countDown(); //计数器减 1
}
}
});
//计数器结果必须是为0 才能继续执行
System.out.println("zk正在等待连接");
countDownLatch.await();
System.out.println("开始创建我们的连接");
//创建账号 admin 可以实现读写操作
Id admin = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
ACL acl1 = new ACL(ZooDefs.Perms.ALL, admin);
//创建账号 guest 只允许做读操作
Id guest = new Id("digest", DigestAuthenticationProvider.generateDigest("guest:guest123"));
ACL acl2 = new ACL(ZooDefs.Perms.READ, guest);
ArrayList<ACL> acls = new ArrayList<>();
acls.add(acl1);
acls.add(acl2);
//2、创建我们的节点
/**
* 参数一:路径名称
* 参数二:节点value
* 参数三:节点权限acl
* 蚕食四:节点类型 临时和永久
*/
String s = zooKeeper.create("/kaico/acl", "hello,boy".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println(s);
zooKeeper.close();
}
}
2、获取设置了权限的节点数据
public class Test003 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//计数器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
/**
* 参数一:连接地址
* 参数二:zk超时时间
* 参数三:事件通知
*/
//1、创建zk链接
ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if(state == Event.KeeperState.SyncConnected){
System.out.println("zk链接成功");
countDownLatch.countDown(); //计数器减 1
}
}
});
//计数器结果必须是为0 才能继续执行
System.out.println("zk正在等待连接");
countDownLatch.await();
System.out.println("开始创建我们的连接");
//设置一下zookeeper 的账号才有权限获取内容
zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes());
//获取节点的内容
byte[] data = zooKeeper.getData("/kaico/acl", null, new Stat());
System.out.println(new String(data));
zooKeeper.close();
}
}
实现事件监听通知
Zookeeper实现基本的总结:类似于文件存储系统,可以帮助我们解决分布式领域中遇到问题
Zookeeper分布式协调工具
特征:
- 定义的节点包含key (路径)和 value ,路径不允许有重复保证唯一性.
- Zookeeper分为四种类型持久、持久序号、临时、临时序号.
- 持久与临时节点区别:连接如果一旦关闭,当前的节点自动删除;
- 事件通知监听节点发生的变化删除、修改、子节点
Java代码案例:
public class Test004 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//计数器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
//1、创建zk 连接
ZkClient zkClient = new ZkClient(ADDRES, TIMAOUT);
String parentPath = "/kaico/jing";
//2、监听节点发生的变化,监听子节点是否发生变化,如果发生变化都可以获取到回调通知。
// zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
// @Override
// public void handleChildChange(String s, List<String> list) throws Exception {
// System.out.println("s:" + s + ",节点发生了变化");
// list.forEach((t)->{
// System.out.println("子节点:" + t);
// });
// }
// });
//监听节点的内容是否发生变化或删除
zkClient.subscribeDataChanges(parentPath, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("修改的节点为:" + s + ",修改之后的值:" + o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("节点:" + s + "被删除");
}
});
//修改值内容
zkClient.writeData(parentPath, "666666666666666");
while (true){
}
// zkClient.close();
}
}
微服务使用zookeeper作为注册中心
调用接口逻辑图
使用zookeeper实现逻辑:
根据服务提供方的名称创建对应的节点,服务提供方的接口所有的ip+端口作为子节点的value的值,这样服务调用方根据服务提供方的名称在zookeeper上找到对应的ip+端口从而可以调用对应的接口,再监听该节点,如果提供接口的机器发生宕机于zookeeper断开连接,子节点也相应的减少了,服务调用方也会收到通知。
分布式锁
分布式锁的概念:解决再多个jvm中最终只能有一个jvm 执行。
zookeeper实现分布式锁的思路:
节点保证唯一、事件通知、临时节点(生命周期和Session会关联)﹒
创建分布式锁原理:
1.多个jvm同时在Zookeeper 上创建相同的临时节点(lockPath).
2. 因为临时节点路径保证唯一的性,只要谁能够创建成功谁就能够获取锁,就可以开始执
行业务逻辑;,
3.如果节点已经给其他请求创建的话或者是创建节点失败,当前的请求实现等待;
释放锁的原理
因为我们采用临时节点,当前节点创建成功,表示获取锁成功;正常执行完业务逻辑调用Session关闭连接方法,当前的节点会删除;----释放锁
其他正在等待请求,采用事件监听如果当前节点被删除的话,又重新进入到获取锁流程;
临时节点+事件通知。
代码实现分布式锁
实现分布式锁的方式有多种:数据库、redis、zookeeper,这里使用zookeeper实现。
实现原理:
因为Zookeeper节点路径保持唯一,不允许重复 且有临时节点特性连接关闭后当前节点会自动消失,从而实现分布式锁。
- 多请求同时创建相同的节点(lockPath),只要谁能够创建成功 谁就能够获取到锁;
- 如果创建节点的时候,突然该节点已经被其他请求创建的话则直接等待;
- 只要能够创建节点成功,则开始进入到正常业务逻辑操作,其他没有获取锁进行等待;
- 正常业务逻辑流程执行完后,调用zk关闭连接方式释放锁,从而是其他的请求开始进入到获取锁的资源。
使用zookeeper 实现分们式锁的代码案例
利用模板设计模式实现分布式锁
1、定义锁接口 Lock
public interface Lock {
/**
* 获取锁
*/
public void getLock();
/**
* 释放锁
*/
public void unLock();
}
2、定义抽象类实现锁接口 Lock ,设计其他的方法完成对锁的操作
abstract class AbstractTemplzateLock implements Lock {
@Override
public void getLock() {
// 模版方法 定义共同抽象的骨架
if (tryLock()) {
System.out.println(">>>" + Thread.currentThread().getName() + ",获取锁成功");
} else {
// 开始实现等待
waitLock();// 事件监听
// 重新获取
getLock();
}
}
/**
* 获取锁
* @return
*/
protected abstract boolean tryLock();
/**
* 等待锁
* @return
*/
protected abstract void waitLock();
/**
* 释放锁
* @return
*/
protected abstract void unImplLock();
@Override
public void unLock() {
unImplLock();
}
}
3、利用zookeeper实现锁,继承抽象类 AbstractTemplzateLock
public class ZkTemplzateImplLock extends AbstractTemplzateLock {
//参数1 连接地址
private static final String ADDRES = "192.168.212.147:2181";
// 参数2 zk超时时间
private static final int TIMEOUT = 5000;
// 创建我们的zk连接
private ZkClient zkClient = new ZkClient(ADDRES, TIMEOUT);
/**
* 共同的创建临时节点
*/
private String lockPath = "/lockPath";
private CountDownLatch countDownLatch = null;
@Override
protected boolean tryLock() {
// 获取锁的思想:多个jvm同时创建临时节点,只要谁能够创建成功 谁能够获取到锁
try {
zkClient.createEphemeral(lockPath);
return true;
} catch (Exception e) {
// // 如果创建已经存在的话
// e.printStackTrace();
return false;
}
}
@Override
protected void waitLock() {
// 1.使用事件监听 监听lockPath节点是否已经被删除,如果被删除的情况下 有可以重新的进入到获取锁的权限
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (countDownLatch != null) {
countDownLatch.countDown();// 计数器变为0
}
}
};
zkClient.subscribeDataChanges(lockPath, iZkDataListener);
// 2.使用countDownLatch等待
if (countDownLatch == null) {
countDownLatch = new CountDownLatch(1);
}
try {
countDownLatch.await();// 如果当前计数器不是为0 就一直等待
} catch (Exception e) {
}
// 3. 如果当前节点被删除的情况下,有需要重新进入到获取锁
zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
}
@Override
protected void unImplLock() {
if (zkClient != null) {
zkClient.close();
System.out.println(Thread.currentThread().getName() + ",释放了锁>>>");
}
}
}
4、编写使用锁的方法
//main方法使用多线程测试分布式锁
public static void main(String[] args) {
// OrderService orderService = new OrderService();
for (int i = 0; i < 100; i++) {
new Thread(new OrderService()).start();
}
// 单个jvm中多线程同时生成订单号码如果发生重复 如何解决 synchronized或者是lock锁
// 如果在多个jvm中同时生成订单号码如果发生重复如何解决
// 注意synchronized或者是lock锁 只能够在本地的jvm中有效
// 分布式锁的概念
}
//多线程run 方法
public class OrderService implements Runnable {
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private Lock lock = new ZkTemplzateImplLock();
@Override
public void run() {
getNumber();
}
private void getNumber() {
try {
lock.getLock();
Thread.sleep(50);
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",获取的number:" + number);
// 如果zk超时了,有做数据库写的操作统一直接回滚
} catch (Exception e) {
} finally {
lock.unLock();
}
}
// ZkTemplzateImplLock父亲 模版类 AbstractTemplzateLock 父亲 Lock
}
//自动生成订单号的类
public class OrderNumGenerator {
/**
* 序号
*/
private static int count;
/**
* 生成我们的时间戳 为订单号码
* @return
*/
public String getNumber() {
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
try {
Thread.sleep(30);
} catch (Exception e) {
}
return simpt.format(new Date()) + "-" + ++count;
}
}
如何防止死锁?
创建zkClient时设置session 连接时间 sessionTimeout。 也就是设置Session连接超时时间,在规定的时间内获取锁后超时啦~自动回滚当前数据库业务逻辑。
注意:等待锁时,zkClient注册的事件最后需要删除。
到此这篇关于Java zookeeper服务的使用详解的文章就介绍到这了,更多相关Java zookeeper服务内容请搜索编程学习网以前的文章希望大家以后多多支持编程学习网!
本文标题为:Java zookeeper服务的使用详解
基础教程推荐
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- Java实现线程插队的示例代码 2022-09-03
- Java文件管理操作的知识点整理 2023-05-19
- Java实现查找文件和替换文件内容 2023-04-06
- java基础知识之FileInputStream流的使用 2023-08-11
- springboot自定义starter方法及注解实例 2023-03-31
- Java并发编程进阶之线程控制篇 2023-03-07
- java实现多人聊天系统 2023-05-19
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02
- Java数据结构之对象比较详解 2023-03-07