Java线程间共享与协作详细介绍

这篇文章主要介绍了Java线程间共享与协作详细介绍,Java 支持多个线程同时访问一个对象或者对象的成员变量,更多相关介绍需要的朋友可以参考一下

线程的共享

synchronized内置锁

Java 支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized 可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。 对象锁和类锁: 对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的 class 对象上的。我们知道,类的对象实例可以有很多个,但是每个类只有一个 class 对象,所以不同对象实例的对象锁是互不干扰的,但是每个类只有一个类锁。 但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的 class 对象。类锁和对象锁之间也是互不干扰的。

代码示例:

 *类说明:synchronized关键字的使用方法
 */
public class SynTest {
	private long count =0;
	private Object obj = new Object();//作为一个锁

	public long getCount() {
		return count;
	}

	public void setCount(long count) {
		this.count = count;
	}
	/*用在同步块上*/
	public void incCount(){
		synchronized (obj){
			count++;
		}
	}
	/*用在方法上*/
	public synchronized void incCount2(){
			count++;
	}
	/*用在同步块上,但是锁的是当前类的对象实例*/
	public void incCount3(){
		synchronized (this){
			count++;
		}
	}

	//线程
	private static class Count extends Thread{

		private SynTest simplOper;

		public Count(SynTest simplOper) {
			this.simplOper = simplOper;
		}

		@Override
		public void run() {
			for(int i=0;i<10000;i++){
				simplOper.incCount();//count = count+10000
			}
		}
	}
	public static void main(String[] args) throws InterruptedException {
		SynTest simplOper = new SynTest();
		//启动两个线程
		Count count1 = new Count(simplOper);
		Count count2 = new Count(simplOper);
		count1.start();
		count2.start();
		Thread.sleep(50);
		System.out.println(simplOper.count);//20000
	}
}
/**
 *类说明:锁的实例不一样,也是可以并行的
 */
public class DiffInstance {
    private static class InstanceSyn implements Runnable{
        private DiffInstance diffInstance;

        public InstanceSyn(DiffInstance diffInstance) {
            this.diffInstance = diffInstance;
        }
        @Override
        public void run() {
            System.out.println("TestInstance is running..."+ diffInstance);
            diffInstance.instance();
        }
    }
    private static class Instance2Syn implements Runnable{
        private DiffInstance diffInstance;

        public Instance2Syn(DiffInstance diffInstance) {
            this.diffInstance = diffInstance;
        }
        @Override
        public void run() {
            System.out.println("TestInstance2 is running..."+ diffInstance);
            diffInstance.instance2();
        }
    }
    private synchronized void instance(){
        SleepTools.second(3);
        System.out.println("synInstance is going..."+this.toString());
        SleepTools.second(3);
        System.out.println("synInstance ended "+this.toString());
    }
    private synchronized void instance2(){
        SleepTools.second(3);
        System.out.println("synInstance2 is going..."+this.toString());
        SleepTools.second(3);
        System.out.println("synInstance2 ended "+this.toString());
    }
    public static void main(String[] args) {
        DiffInstance instance1 = new DiffInstance();
        Thread t3 = new Thread(new Instance2Syn(instance1));
        DiffInstance instance2 = new DiffInstance();
        Thread t4 = new Thread(new InstanceSyn(instance1));
        //先执行完一个才会执行另外一个
        t3.start();
        t4.start();
        SleepTools.second(1);
    }
}

/**
 *类说明:演示实例锁和类锁是不同的,两者可以并行
 */
public class InstanceAndClass {
    private static class SynClass extends Thread{
        @Override
        public void run() {
            System.out.println("TestClass is running...");
            synClass();
        }
    }
    private static class InstanceSyn implements Runnable{
        private InstanceAndClass SynClassAndInstance;

        public InstanceSyn(InstanceAndClass SynClassAndInstance) {
            this.SynClassAndInstance = SynClassAndInstance;
        }
        @Override
        public void run() {
            System.out.println("TestInstance is running..."+SynClassAndInstance);
            SynClassAndInstance.instance();
        }
    }

    private synchronized void instance(){
        SleepTools.second(1);
        System.out.println("synInstance is going..."+this.toString());
        SleepTools.second(1);
        System.out.println("synInstance ended "+this.toString());
    }

    private static synchronized void synClass(){
        SleepTools.second(1);
        System.out.println("synClass going...");
        SleepTools.second(1);
        System.out.println("synClass end");
    }

    public static void main(String[] args) {
        InstanceAndClass synClassAndInstance = new InstanceAndClass();
        Thread t1 = new SynClass();
        Thread t2 = new Thread(new InstanceSyn(synClassAndInstance));
        t2.start();
        SleepTools.second(1);
        t1.start();
    }
}

/**
 *类说明:类锁和锁static变量也是不同的 可以并行
 */
public class StaticAndClass {

    private static class SynClass extends Thread{
        @Override
        public void run() {
            System.out.println(currentThread().getName()
                    +":SynClass is running...");
            synClass();
        }
    }
    private static class SynStatic extends Thread{
        @Override
        public void run() {
            System.out.println(currentThread().getName()
                    +"SynStatic is running...");
            synStatic();
        }
    }
    private static synchronized void synClass(){
        System.out.println(Thread.currentThread().getName()
                +"synClass going...");
        SleepTools.second(1);
        System.out.println(Thread.currentThread().getName()
                +"synClass end");
    }

    private static Object obj = new Object();
    private static void synStatic(){
        synchronized (obj){
            System.out.println(Thread.currentThread().getName()
                    +"synStatic going...");
            SleepTools.second(1);
            System.out.println(Thread.currentThread().getName()
                    +"synStatic end");
        }
    }
    public static void main(String[] args) {
        StaticAndClass synClassAndInstance = new StaticAndClass();
        Thread t1 = new SynClass();
        //Thread t2 = new SynStatic();
        Thread t2 = new SynClass();
        t2.start();
        SleepTools.second(1);
        t1.start();
    }
}

错误的加锁和原因分析

原因:虽然我们对 i 进行了加锁,但是

 但是当我们反编译这个类的 class 文件后,可以看到 i++实际是, 

 本质上是返回了一个新的 Integer 对象。也就是每个线程实际加锁的是不同的 Integer 对象。

volatile,最轻量的同步机制

volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。 

 不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环,而加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。

/**
 * 类说明:演示Volatile的提供的可见性
 */
public class VolatileCase {
    private volatile static boolean ready;
    private static int number;

    //
    private static class PrintThread extends Thread{
        @Override
        public void run() {
            System.out.println("PrintThread is running.......");
            while(!ready);//无限循环
            System.out.println("number = "+number);
        }
    }

    public static void main(String[] args) {
        new PrintThread().start();
        SleepTools.second(1);
        number = 51;//如果没有加volatile关键字则主线程都结束了也没有打印number的值,加了关键值后打印出来的值就是主线程修改的值
        ready = true;
        SleepTools.second(5);
        System.out.println("main is ended!");
    }
}

但是 volatile 不能保证数据在多个线程下同时写时的线程安全。

/**
 * 类说明:
 */
public class NotSafe {
    private volatile long count =0;

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }
    //count进行累加
    public void incCount(){
        count++;
    }

    //线程
    private static class Count extends Thread{

        private NotSafe simplOper;

        public Count(NotSafe simplOper) {
            this.simplOper = simplOper;
        }

        @Override
        public void run() {
            for(int i=0;i<10000;i++){
                simplOper.incCount();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        NotSafe simplOper = new NotSafe();
        //启动两个线程
        Count count1 = new Count(simplOper);
        Count count2 = new Count(simplOper);
        count1.start();
        count2.start();
        Thread.sleep(50);
        System.out.println(simplOper.count);//20000?
    }
}

volatile 最适用的场景:一个线程写,多个线程读。

ThreadLocal

与 Synchonized的比较

ThreadLocal 和 Synchonized 都用于解决多线程并发訪问。可是 ThreadLocal与 synchronized 有本质的差别。synchronized 是利用锁的机制,使变量或代码块在某一时该仅仅能被一个线程訪问。而 ThreadLocal 为每个线程都提供了变量的副本 ,使得每个线程在某一时间訪问到的并非同一个对象,这样就隔离了多个线程对数据的数据共享。 Spring 的事务就借助了 ThreadLocal 类。Spring 会从数据库连接池中获得一个connection,然会把connection 放进 ThreadLocal 中,也就和线程绑定了,事务需要提交或者回滚,只要从 ThreadLocal 中拿到 connection 进行操作。为何 Spring的事务要借助 ThreadLocal 类?

以 JDBC 为例,正常的事务代码可能如下:

dbc = new DataBaseConnection();//第 1 行
Connection con = dbc.getConnection();//第 2 行
con.setAutoCommit(false);// //第 3 行
con.executeUpdate(...);//第 4 行
con.executeUpdate(...);//第 5 行
con.executeUpdate(...);//第 6 行
con.commit();////第 7 行

上述代码,可以分成三个部分: 事务准备阶段:第 1~3 行 业务处理阶段:第 4~6 行 事务提交阶段:第 7 行 可以很明显的看到,不管我们开启事务还是执行具体的 sql 都需要一个具体的数据库连接。现在我们开发应用一般都采用三层结构,如果我们控制事务的代码都放在DAO(DataAccessObject)对象中,在 DAO 对象的每个方法当中去打开事务和关闭事务,当 Service 对象在调用 DAO 时,如果只调用一个 DAO,那我们这样实现则效果不错,但往往我们的 Service 会调用一系列的 DAO 对数据库进行多次操作,那么,这个时候我们就无法控制事务的边界了,因为实际应用当中,我们的 Service调用的 DAO 的个数是不确定的,可根据需求而变化,而且还可能出现 Service 调用 Service 的情况。

如果不使用 ThreadLocal,代码大概就会是这个样子: 

 但是需要注意一个问题,如何让三个 DAO 使用同一个数据源连接呢?我们就必须为每个 DAO 传递同一个数据库连接,要么就是在 DAO 实例化的时候作为构造方法的参数传递,要么在每个 DAO 的实例方法中作为方法的参数传递。这两种方式无疑对我们的 Spring 框架或者开发人员来说都不合适。为了让这个数据库连接可以跨阶段传递,又不显示的进行参数传递,就必须使用别的办法。 Web 容器中,每个完整的请求周期会由一个线程来处理。因此,如果我们能将一些参数绑定到线程的话,就可以实现在软件架构中跨层次的参数共享(是隐式的共享)。而 JAVA 中恰好提供了绑定的方法--使用 ThreadLocal。 结合使用 Spring 里的 IOC 和 AOP,就可以很好的解决这一点。 只要将一个数据库连接放入 ThreadLocal 中,当前线程执行时只要有使用数据库连接的地方就从 ThreadLocal 获得就行了。

ThreadLocal的使用

ThreadLocal 类接口很简单,只有 4 个方法,我们先来了解一下: • void set(Object value) 设置当前线程的线程局部变量的值。 • public Object get() 该方法返回当前线程所对应的线程局部变量。 • public void remove() 将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是 JDK5.0 新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。 • protected Object initialValue() 返回该线程局部变量的初始值,该方法是一个 protected 的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第 1 次调用 get()或 set(Object)时才执行,并且仅执行 1 次。ThreadLocal 中的缺省实现直接返回一个 null。

public final static ThreadLocal RESOURCE = new ThreadLocal();RESOURCE 代表一个能够存放 String 类型的 ThreadLocal 对象。此时不论什么一个线程能够并发访问这个变量,对它进行写入、读取操作,都是线程安全的。

代码示例:

/**
 *类说明:演示ThreadLocal的使用
 */
public class UseThreadLocal {
	private static ThreadLocal<Integer> intLocal
            = new ThreadLocal<Integer>(){
        @Override
        protected Integer initialValue() {
            return 1;
        }
    };
    private static ThreadLocal<String> stringThreadLocal;

    /**
     * 运行3个线程
     */
    public void StartThreadArray(){
        Thread[] runs = new Thread[3];
        for(int i=0;i<runs.length;i++){
            runs[i]=new Thread(new TestThread(i));
        }
        for(int i=0;i<runs.length;i++){
            runs[i].start();
        }
    }
    
    /**
     *类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
     */
    public static class TestThread implements Runnable{
        int id;
        public TestThread(int id){
            this.id = id;
        }
        public void run() {
            System.out.println(Thread.currentThread().getName()+":start");
            Integer s = intLocal.get();
            s = s+id;
            intLocal.set(s);
            System.out.println(Thread.currentThread().getName()
                    +":"+ intLocal.get());
            //intLocal.remove();
        }
    }
    public static void main(String[] args){
    	UseThreadLocal test = new UseThreadLocal();
        test.StartThreadArray();
    }
}

/**
 * 类说明:
 */
public class NoThreadLocal {
    static Integer count = new Integer(1);

    /**
     * 运行3个线程
     */
    public void StartThreadArray(){
        Thread[] runs = new Thread[3];
        for(int i=0;i<runs.length;i++){
            runs[i]=new Thread(new TestTask(i));
        }
        for(int i=0;i<runs.length;i++){
            runs[i].start();
        }
    }

    /**
     *类说明:
     */
    public static class TestTask implements Runnable{
        int id;
        public TestTask(int id){
            this.id = id;
        }
        public void run() {
            System.out.println(Thread.currentThread().getName()+":start");
            count = count+id;
            System.out.println(Thread.currentThread().getName()+":"
                    +count);
        }
    }

    public static void main(String[] args){
        NoThreadLocal test = new NoThreadLocal();
        test.StartThreadArray();
    }
}

实现解析

 上面先取到当前线程,然后调用 getMap 方法获取对应的 ThreadLocalMap,ThreadLocalMap 是 ThreadLocal 的静态内部类,然后 Thread 类中有一个这样类型成员,所以 getMap 是直接返回 Thread 的成员。

看下 ThreadLocal 的内部类 ThreadLocalMap 源码: 

 可以看到有个 Entry 内部静态类,它继承了 WeakReference,总之它记录了两个信息,一个是 ThreadLocal<?>类型,一个是 Object 类型的值。getEntry 方法则是获取某个 ThreadLocal 对应的值,set 方法就是更新或赋值相应的 ThreadLocal对应的值。 

 回顾我们的 get 方法,其实就是拿到每个线程独有的 ThreadLocalMap,然后再用 ThreadLocal 的当前实例,拿到 Map 中的相应的 Entry,然后就可以拿到相应的值返回出去。当然,如果 Map 为空,还会先进行 map 的创建,初始化等工作。

引发的内存泄漏分析

引用 Object o = new Object(); 这个 o,我们可以称之为对象引用,而 new Object()我们可以称之为在内存中产生了一个对象实例。 

 当写下 o=null 时,只是表示 o 不再指向堆中 object 的对象实例,不代表这个对象实例不存在了。

强引用就是指在程序代码之中普遍存在的,类似“Object obj=new Object()”这类的引用,只要强引用还存在,垃圾收集器永远不会回收掉被引用的对象实例。

软引用是用来描述一些还有用但并非必需的对象。对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象实例列进回收范围之中进行第二次回收。如果这次回收还没有足够的内存,才会抛出内存溢出异常。在 JDK1.2 之后,提供了 SoftReference 类来实现软引用。

弱引用也是用来描述非必需对象的,但是它的强度比软引用更弱一些,被弱引用关联的对象实例只能生存到下一次垃圾收集发生之前。当垃圾收集器工作时,无论当前内存是否足够,都会回收掉只被弱引用关联的对象实例。在 JDK 1.2 之后,提供了WeakReference 类来实现弱引用。

虚引用也称为幽灵引用或者幻影引用,它是最弱的一种引用关系。一个对象实例是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例。为一个对象设置虚引用关联的唯一目的就是能在这个对象实例被收集器回收时收到一个系统通知。在 JDK 1.2 之后,提供了PhantomReference 类来实现虚引用。

内存泄漏的现象

将堆内存大小设置为-Xmx256m 我们启用一个线程池,大小固定为 5 个线程

    final static ThreadPoolExecutor poolExecutor
            = new ThreadPoolExecutor(5, 5,
            1,
            TimeUnit.MINUTES,
            new LinkedBlockingQueue<>());

场景 1,首先任务中不执行任何有意义的代码,当所有的任务提交执行完成后,可以看见,我们这个应用的内存占用基本上为 25M 左右 

 场景 2,然后我们只简单的在每个任务中 new 出一个数组,执行完成后我们可以看见,内存占用基本和场景 1 同 

 场景 3,当我们启用了 ThreadLocal 以后: 

 执行完成后我们可以看见,内存占用变为了 100M 左右场景 4,于是,我们加入一行代码,再执行,看看内存情况:

 可以看见,内存占用基本和场景 1 同。 这就充分说明,场景 3,当我们启用了 ThreadLocal 以后确实发生了内存泄漏。

分析

根据我们前面对 ThreadLocal 的分析,我们可以知道每个 Thread 维护一个ThreadLocalMap,这个映射表的 key 是 ThreadLocal 实例本身,value 是真正需要存储的 Object,也就是说 ThreadLocal 本身并不存储值,它只是作为一个 key来让线程从ThreadLocalMap 获取 value。仔细观察 ThreadLocalMap,这个 map是使用 ThreadLocal 的弱引用作为 Key 的,弱引用的对象在 GC 时会被回收。

因此使用了 ThreadLocal 后,引用链如图所示 :

 图中的虚线表示弱引用。 这样,当把 threadlocal 变量置为 null 以后,没有任何强引用指向 threadlocal实例,所以 threadlocal 将会被 gc 回收。这样一来,ThreadLocalMap 中就会出现key 为 null 的 Entry,就没有办法访问这些 key 为 null 的 Entry 的 value,如果当前线程再迟迟不结束的话,这些 key 为 null 的 Entry 的 value 就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value,而这块 value 永远不会被访问到了,所以存在着内存泄露。 只有当前 thread 结束以后,current thread 就不会存在栈中,强引用断开,Current Thread、Map value 将全部被 GC 回收。最好的做法是不在需要使用ThreadLocal 变量后,都调用它的 remove()方法,清除数据。 所以回到我们前面的实验场景,场景 3 中,虽然线程池里面的任务执行完毕了,但是线程池里面的 5 个线程会一直存在直到 JVM 退出,我们 set 了线程的localVariable 变量后没有调用 localVariable.remove()方法,导致线程池里面的 5 个线程的 threadLocals 变量里面的 new LocalVariable()实例没有被释放。 其实考察 ThreadLocal 的实现,我们可以看见,无论是 get()、set()在某些时候,调用了 expungeStaleEntry 方法用来清除 Entry 中 Key 为 null 的 Value,但是这是不及时的,也不是每次都会执行的,所以一些情况下还是会发生内存泄露。只有 remove()方法中显式调用了 expungeStaleEntry 方法。 从表面上看内存泄漏的根源在于使用了弱引用,但是另一个问题也同样值得

思考:为什么使用弱引用而不是强引用?

下面我们分两种情况讨论:

key 使用强引用:引用 ThreadLocal 的对象被回收了,但是 ThreadLocalMap还持有 ThreadLocal 的强引用,如果没有手动删除,ThreadLocal 的对象实例不会被回收,导致 Entry 内存泄漏。

key 使用弱引用:引用的 ThreadLocal 的对象被回收了,由于 ThreadLocalMap持有 ThreadLocal 的弱引用,即使没有手动删除,ThreadLocal 的对象实例也会被回收。value 在下一次 ThreadLocalMap 调用 set,get,remove 都有机会被回收。

比较两种情况,我们可以发现:由于 ThreadLocalMap 的生命周期跟 Thread一样长,如果都没有手动删除对应 key,都会导致内存泄漏,但是使用弱引用可以多一层保障。 因此,ThreadLocal 内存泄漏的根源是:由于 ThreadLocalMap 的生命周期跟Thread 一样长,如果没有手动删除对应 key 就会导致内存泄漏,而不是因为弱引用。

总结 JVM 利用设置 ThreadLocalMap 的 Key 为弱引用,来避免内存泄露。 JVM 利用调用 remove、get、set 方法的时候,回收弱引用。 当 ThreadLocal 存储很多 Key 为 null 的 Entry 的时候,而不再去调用 remove、get、set 方法,那么将导致内存泄漏。 使用线程池+ ThreadLocal 时要小心,因为这种情况下,线程是一直在不断的重复运行的,从而也就造成了 value 可能造成累积的情况。

/**
 * 类说明:ThreadLocal造成的内存泄漏演示
 */
public class ThreadLocalOOM {
    private static final int TASK_LOOP_SIZE = 500;

    final static ThreadPoolExecutor poolExecutor
            = new ThreadPoolExecutor(5, 5,
            1,
            TimeUnit.MINUTES,
            new LinkedBlockingQueue<>());

    static class LocalVariable {
        private byte[] a = new byte[1024*1024*5];/*5M大小的数组*/
    }

    final static ThreadLocal<LocalVariable> localVariable
            = new ThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        Object o = new Object();
        /*5*5=25*/
        for (int i = 0; i < TASK_LOOP_SIZE; ++i) {
            poolExecutor.execute(new Runnable() {
                public void run() {
                    //localVariable.set(new LocalVariable());
                    new LocalVariable();
                    System.out.println("use local varaible");
                    //localVariable.remove();
                }
            });

            Thread.sleep(100);
        }
        System.out.println("pool execute over");
    }
}

错误使用ThreadLocal导致线程不安全

/**
 * 类说明:ThreadLocal的线程不安全演示
 */
public class ThreadLocalUnsafe implements Runnable {
    public static Number number = new Number(0);
    public void run() {
        //每个线程计数加一
        number.setNum(number.getNum()+1);
      //将其存储到ThreadLocal中
        value.set(number);
        SleepTools.ms(2);
        //输出num值
        System.out.println(Thread.currentThread().getName()+"="+value.get().getNum());
    }
    public static ThreadLocal<Number> value = new ThreadLocal<Number>() {
    };
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new ThreadLocalUnsafe()).start();
        }
    }
    private static class Number {
        public Number(int num) {
            this.num = num;
        }
        private int num;
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        @Override
        public String toString() {
            return "Number [num=" + num + "]";
        }
    }
}

为什么每个线程都输出 5?难道他们没有独自保存自己的 Number 副本吗?为什么其他线程还是能够修改这个值?仔细考察 ThreadLocal 和 Thead 的代码,我们发现 ThreadLocalMap 中保存的其实是对象的一个引用,这样的话,当有其他线程对这个引用指向的对象实例做修改时,其实也同时影响了所有的线程持有的对象引用所指向的同一个对象实例。这也就是为什么上面的程序为什么会输出一样的结果:5 个线程中保存的是同一 Number 对象的引用,在线程睡眠的时候,其他线程将 num 变量进行了修改,而修改的对象 Number 的实例是同一份,因此它们最终输出的结果是相同的。 而上面的程序要正常的工作,应该的用法是让每个线程中的 ThreadLocal 都应该持有一个新的 Number 对象。

线程间的协作

线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题: 1) 难以确保及时性。 2)难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

等待/通知机制

是指一个线程 A 调用了对象 O 的 wait()方法进入等待状态,而另一个线程 B调用了对象 O 的 notify()或者 notifyAll()方法,线程 A 收到通知后从对象 O 的 wait()方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

notify(): 通知一个在对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入 WAITING 状态。

notifyAll(): 通知所有等待在该对象上的线程

wait() 调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用 wait()方法后,会释放对象的锁

wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达 n 毫秒,如果没有通知就超时返回

wait (long,int) 对于超时时间更细粒度的控制,可以达到纳秒

等待和通知的标准范式

等待方遵循如下原则:

  • 1)获取对象的锁。
  • 2)如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件。
  • 3)条件满足则执行对应的逻辑。

通知方遵循如下原则:

  • 1)获得对象的锁。
  • 2)改变条件。
  • 3)通知所有等待在对象上的线程。

在调用 wait()、notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法、notify()系列方法,进入 wait()方法后,当前线程释放锁,在从 wait()返回前,线程与其他线程竞争重新获得锁,执行 notify()系列方法的线程退出调用了 notifyAll 的 synchronized代码块的时候后,他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出 synchronized 代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。

notify 和 notifyAll 应该用谁

尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程

代码示例:

/**
 *类说明:快递实体类
 */
public class Express {
    public final static String CITY = "ShangHai";
    private int km;/*快递运输里程数*/
    private String site;/*快递到达地点*/

    public Express() {
    }

    public Express(int km, String site) {
        this.km = km;
        this.site = site;
    }

    /* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/
    public synchronized void changeKm(){
        this.km = 101;
        notify();
    }

    /* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/
    public  synchronized  void changeSite(){
        this.site = "BeiJing";
        notifyAll();
    }

    /*线程等待公里的变化*/
    public synchronized void waitKm(){
        while(this.km<100){
            try {
                wait();
                System.out.println("Check Site thread["
                                +Thread.currentThread().getId()
                        +"] is be notified");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("the Km is "+this.km+",I will change db");
    }

    /*线程等待目的地的变化*/
    public synchronized void waitSite(){
        while(this.site.equals(CITY)){//快递到达目的地
            try {
                wait();
                System.out.println("Check Site thread["+Thread.currentThread().getId()
                		+"] is be notified");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("the site is "+this.site+",I will call user");
    }
}
/**
 *类说明:测试wait/notify/notifyAll
 */
public class TestWN {
    private static Express express = new Express(0,Express.CITY);

    /*检查里程数变化的线程,不满足条件,线程一直等待*/
    private static class CheckKm extends Thread{
        @Override
        public void run() {
        	express.waitKm();
        }
    }
    /*检查地点变化的线程,不满足条件,线程一直等待*/
    private static class CheckSite extends Thread{
        @Override
        public void run() {
        	express.waitSite();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++){
            new CheckSite().start();
        }
        for(int i=0;i<3;i++){
            new CheckKm().start();
        }

        Thread.sleep(1000);
        express.changeKm();//快递地点变化
    }
}

等待超时模式实现一个连接池

调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果 假设等待时间段是 T,那么可以推断出在当前时间 now+T 之后就会超时 等待持续时间:REMAINING=T。 超时时间:FUTURE=now+T。

/**
 *类说明:连接池的实现
 */
public class DBPool {
    /*容器,存放连接*/
    private static LinkedList<Connection> pool = new LinkedList<Connection>();
    /*限制了池的大小=20*/
    public DBPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
    }
    /*释放连接,通知其他的等待连接的线程*/
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool){
                pool.addLast(connection);
                //通知其他等待连接的线程
                pool.notifyAll();
            }
        }
    }
    /*获取*/
    // 在mills内无法获取到连接,将会返回null 1S
    public Connection fetchConnection(long mills)
            throws InterruptedException {
        synchronized (pool){
            //永不超时
            if(mills<=0){
                while(pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }else{
                /*超时时刻*/
                long future = System.currentTimeMillis()+mills;
                /*等待时长*/
                long remaining = mills;
                while(pool.isEmpty()&&remaining>0){
                    pool.wait(remaining);
                    /*唤醒一次,重新计算等待时长*/
                    remaining = future-System.currentTimeMillis();
                }
                Connection connection = null;
                if(!pool.isEmpty()){
                    connection = pool.removeFirst();
                }
                return connection;
            }
        }

    }
}
/**
 *类说明:
 */
public class DBPoolTest {
    static DBPool pool  = new DBPool(10);
    // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
    static CountDownLatch end;
    public static void main(String[] args) throws Exception {
    	// 线程数量
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;//每个线程的操作次数
        AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
        AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new Worker(count, got, notGot), 
            		"worker_"+i);
            thread.start();
        }
        end.await();// main线程在此处等待
        System.out.println("总共尝试了: " + (threadCount * count));
        System.out.println("拿到连接的次数:  " + got);
        System.out.println("没能连接的次数: " + notGot);
    }
    static class Worker implements Runnable {
        int           count;
        AtomicInteger got;
        AtomicInteger notGot;
        public Worker(int count, AtomicInteger got,
                              AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }
        public void run() {
            while (count > 0) {
                try {
                    // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                    // 分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
//                            PreparedStatement preparedStatement
//                                    = connection.prepareStatement("");
//                            preparedStatement.execute();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                        System.out.println(Thread.currentThread().getName()
                        		+"等待超时!");
                    }
                } catch (Exception ex) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}
/**
 *类说明:
 */
public class SqlConnectImpl implements Connection{
	
	/*拿一个数据库连接*/
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }
    .........

 客户端获取连接的过程被设定为等待超时的模式,也就是在 1000 毫秒内如果无法获取到可用连接,将会返回给客户端一个 null。设定连接池的大小为 10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。 它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用 fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用 releaseConnection(Connection)方法将连接放回线程池

面试题

调用 yield() 、sleep()、wait()、notify()等方法对锁有何影响?

yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。

调用 wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行 wait 方法后面的代码。 调用 notify()系列方法后,对锁无影响,线程只有在 syn 同步代码执行完后才会自然而然的释放锁,所以 notify()系列方法一般都是 syn 同步代码的最后一行。

到此这篇关于Java线程间共享与协作详细介绍的文章就介绍到这了,更多相关Java共享与协作内容请搜索编程学习网以前的文章希望大家以后多多支持编程学习网!

本文标题为:Java线程间共享与协作详细介绍

基础教程推荐