Java多线程技术9——非阻塞队列

发布时间:2024年01月03日

1 概述

? ? ? ? 非阻塞队列的特色是队列里面没有数据时,返回异常或null。在JDK的并发包中,常见的非阻塞队列有:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue、ConcurrentLinkedDeque、CopyOnWriteArrayList、CopyOnWriteArraySet。本篇将介绍这7个非阻塞队列的特点与使用。

2 ConcurrentHashMap类的使用

2.1 验证HashMap不是线程安全的

public class MyService1 {
    public HashMap map = new HashMap();

    public void testMethod(){
        for (int i = 0; i < 5000; i++) {
            map.put(Thread.currentThread().getName()+" " + (i+1),Thread.currentThread().getName()+" " + (i+1));
            System.out.println(Thread.currentThread().getName() + " " + (i+1));
        }
    }
}
public class Thread1 extends Thread{
    private MyService1 service1;

    public Thread1(MyService1 service1) {
        this.service1 = service1;
    }
    @Override
    public void run(){
        service1.testMethod();
    }
}
public class Thread2 extends Thread{
    private MyService1 service1;

    public Thread2(MyService1 service1) {
        this.service1 = service1;
    }

    @Override
    public void run(){
        service1.testMethod();
    }
}
public class Run1 {
    public static void main(String[] args) {
        MyService1 service1 = new MyService1();
        Thread1 t1 = new Thread1(service1);
        t1.start();
    }
}

如果只创建一个线程Thread1,那么运行结果是正确的。此时控制台运行结果是:

在创建一个线程Thread2,运行结果是:

public class Run1 {
    public static void main(String[] args) {
        MyService1 service1 = new MyService1();
        Thread1 t1 = new Thread1(service1);
        Thread2 t2 = new Thread2(service1);
        t1.start();
        t2.start();
    }
}

? ? ? ? 程序运行后会有很小的概率出现异常(笔者试了很多次没有出现-_-||,读者请自行实验),说明hashMap不能被多个线程操作,也就证明HashMap是非线程安全的。

2.2 验证HashTable是线程安全的

public class MyService1 {
    public Hashtable hashtable = new Hashtable();
    public void method(){
        for (int i = 0; i < 50000; i++) {
            hashtable.put(Thread.currentThread().getName()+ " "+(i+1),
                    Thread.currentThread().getName()+ " "+(i+1));
            System.out.println(Thread.currentThread().getName() + " " + (i+1));
        }
    }
}
public class Thread2 extends Thread{
    private MyService1 service1;

    public Thread2(MyService1 service1) {
        this.service1 = service1;
    }

    @Override
    public void run(){
        service1.method();
    }
}
public class Thread1 extends Thread{
    private MyService1 service1;

    public Thread1(MyService1 service1) {
        this.service1 = service1;
    }
    @Override
    public void run(){
        service1.method();

    }
}
public class Run1 {
    public static void main(String[] args) {
        MyService1 service1 = new MyService1();
        Thread1 t1 = new Thread1(service1);
        Thread1 t2 = new Thread1(service1);
        t1.start();
        t2.start();
    }
}

????????

? ? ? ? 程序运行正确,证明HashTable类在多线程环境中执行put操作不会出错,是线程安全的类。但是,多个线程分别调用该类的iteartor()方法返回Iterator对象,并调用next()方法取得元素,再执行remove()方法时会出现修改并发修改异常,说明HashTable不支持Iterator并发删除。

2.3 验证ConcurrentHashMap线程安全

? ? ? ? ConcurrentHashMap类是JDK并发包中提供的支持并发操作的Map对象。其继承与实现信息如下:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    private static final long serialVersionUID = 7249069246763182397L;
}

? ? ? ? 下面开始验证:?

public class MyService1 {
    public ConcurrentHashMap map = new ConcurrentHashMap();
    public void method(){
        for (int i = 0; i < 50000; i++) {
            map.put(Thread.currentThread().getName()+ " "+(i+1),
                    Thread.currentThread().getName()+ " "+(i+1));
            System.out.println(Thread.currentThread().getName() + " " + (i+1));
        }
    }
}
public class Thread2 extends Thread{
    private MyService1 service1;

    public Thread2(MyService1 service1) {
        this.service1 = service1;
    }

    @Override
    public void run(){
        service1.method();
    }
}
public class Thread1 extends Thread{
    private MyService1 service1;

    public Thread1(MyService1 service1) {
        this.service1 = service1;
    }
    @Override
    public void run(){
        service1.method();

    }
}
public class Run1 {
    public static void main(String[] args) {
        MyService1 service1 = new MyService1();
        Thread1 t1 = new Thread1(service1);
        Thread1 t2 = new Thread1(service1);
        t1.start();
        t2.start();
    }
}

2.4 验证ConcurrentHashMap并发删除

public class MyService2 {
    public ConcurrentHashMap map = new ConcurrentHashMap();

    public MyService2() {
        for (int i = 0; i < 50000; i++) {
            map.put(Thread.currentThread().getName() + (i+1),"abc");
        }
    }

    public void test(){
        Iterator iterator = map.keySet().iterator();
        while(iterator.hasNext()){
            Object next = iterator.next();
            iterator.remove();
            System.out.println(map.size() + " " + Thread.currentThread().getName());
        }
    }
}
public class Thread1 extends Thread{
    public MyService2 myService2;

    public Thread1(MyService2 myService2) {
        this.myService2 = myService2;
    }
    @Override
    public void run(){
        myService2.test();
    }
}
public class Thread2 extends Thread{
    public MyService2 service2;

    public Thread2(MyService2 service2) {
        this.service2 = service2;
    }

    @Override
    public void run(){
        service2.test();
    }
}
public class Run2 {
    public static void main(String[] args) {
        MyService2 myService2 = new MyService2();
        Thread1 t1 = new Thread1(myService2);
        Thread2 t2 = new Thread2(myService2);
        t1.start();
        t2.start();

    }
}

?????????

? ? ? ? 从运行结果看,ConcurrentHashMap在并发情况下支持put和remove。ConcurrentHashMap不支持排序,LinkedHashMap支持key排序,但不支持并发。如果出现既要求并发又要求排序的情况,就可以使用ConcurrentSkipListMap类。

3?ConcurrentSkipListMap类的使用

????????ConcurrentSkipListMap支持排序。

public class UserInfo implements Comparable<UserInfo>{
    private int id;
    private String username;

    public UserInfo(int id, String username) {
        this.id = id;
        this.username = username;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }


    @Override
    public int compareTo(UserInfo o) {
        if(this.getId() > o.getId()){
            return 1;
        }else{
            return -1;
        }
    }
}
public class MyService {
    public ConcurrentSkipListMap<UserInfo,String> map = new ConcurrentSkipListMap<>();
    public MyService(){
        UserInfo userInfo1 = new UserInfo(1,"userinfo1");
        UserInfo userInfo3 = new UserInfo(3,"userinfo1");
        UserInfo userInfo5 = new UserInfo(5,"userinfo1");
        UserInfo userInfo2 = new UserInfo(2,"userinfo1");
        UserInfo userInfo4 = new UserInfo(4,"userinfo1");
        map.put(userInfo1,"u1");
        map.put(userInfo3,"u3");
        map.put(userInfo5,"u5");
        map.put(userInfo2,"u2");
        map.put(userInfo4,"u4");
    }

    public void method(){
        Map.Entry<UserInfo, String> entry = map.pollFirstEntry();
        System.out.println("map size() = " + map.size());
        UserInfo userInfo = entry.getKey();
        System.out.println(userInfo.getId() + " " + userInfo.getUsername() + " " + map.get(userInfo) + " " +entry.getValue());
    }
}
public class MyThread extends Thread{
    private MyService service;

    public MyThread(MyService service) {
        this.service = service;
    }

    @Override
    public void run(){
        service.method();
    }
}
public class Run1 {
    public static void main(String[] args) throws InterruptedException {
        MyService service = new MyService();
        MyThread t1 = new MyThread(service);
        MyThread t2 = new MyThread(service);
        MyThread t3 = new MyThread(service);
        MyThread t4 = new MyThread(service);
        MyThread t5 = new MyThread(service);
        t1.start();
        Thread.sleep(1000);
        t2.start();
        Thread.sleep(1000);
        t3.start();
        Thread.sleep(1000);
        t4.start();
        Thread.sleep(1000);
        t5.start();

    }
}

? ? ? ? 控制台打印出null的值是使用pollFirstEntry()方法将当前的Entry对象从类ConcurrentSkipListMap中删除造成的。

4 ConcurrentSkipListSet类的使用

?????????ConcurrentSkipListSet 类支持排序且不允许元素重复。

public class UserInfo implements Comparable<UserInfo> {
    private int id;
    private String username;

    public UserInfo(int id, String username) {
        this.id = id;
        this.username = username;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public int compareTo(UserInfo userInfo){
        if(this.getId() < userInfo.getId()){
            return -1;
        }
        if(this.getId() > userInfo.getId()){
            return 1;

        }
        return 0;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        UserInfo userInfo = (UserInfo) o;
        return id == userInfo.id && Objects.equals(username, userInfo.username);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, username);
    }
}
public class MyService {
    public ConcurrentSkipListSet set = new ConcurrentSkipListSet();
    public MyService(){
        UserInfo userInfo1 = new UserInfo(1,"username1");
        UserInfo userInfo3 = new UserInfo(3,"username3");
        UserInfo userInfo5 = new UserInfo(5,"username5");
        UserInfo userInfo41 = new UserInfo(4,"username4");
        UserInfo userInfo42 = new UserInfo(4,"username4");
        UserInfo userInfo2 = new UserInfo(2,"username2");
        set.add(userInfo1);
        set.add(userInfo3);
        set.add(userInfo5);
        set.add(userInfo41);
        set.add(userInfo42);
        set.add(userInfo2);
    }
}
public class MyThread extends Thread{
    private MyService service;

    public MyThread(MyService service) {
        this.service = service;
    }

    @Override
    public void run(){
        UserInfo userInfo = (UserInfo) service.set.pollFirst();
        System.out.println(userInfo.getId() + " " + userInfo.getUsername());
    }
}
public class Run1 {
    public static void main(String[] args) throws InterruptedException {
        MyService service = new MyService();
        MyThread t1 = new MyThread(service);
        MyThread t2 = new MyThread(service);
        MyThread t3 = new MyThread(service);
        MyThread t4 = new MyThread(service);
        MyThread t5 = new MyThread(service);
        t1.start();
        Thread.sleep(1000);
        t2.start();
        Thread.sleep(1000);
        t3.start();
        Thread.sleep(1000);
        t4.start();
        Thread.sleep(1000);
        t5.start();
    }
}

? ? ? ? 从运行结果可以看到,排序成功,并且不支持数据重复。?

5 ConcurrentLinkedQueue类的使用

? ? ? ? ConcurrentLinkedQueue类提供了并发环境下的队列操作。

public class MyService {
    public ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque();
}
public class ThreadA extends Thread{
    private MyService service;

    public ThreadA(MyService service) {
        this.service = service;
    }

    @Override
    public void run(){
        for (int i = 0; i < 50; i++) {
            service.queue.add("threadA" + (i +1));
        }
    }
}
public class ThreadB extends Thread{
    private MyService service;

    public ThreadB(MyService service) {
        this.service = service;
    }

    @Override
    public void run(){
        for (int i = 0; i < 50; i++) {
            service.queue.add("ThreadB"+ (i+1));
        }
    }
}
public class Run1 {
    public static void main(String[] args) throws InterruptedException {
        MyService service = new MyService();
        ThreadA a = new ThreadA(service);
        ThreadB b = new ThreadB(service);
        a.start();
        b.start();
        a.join();
        b.join();
        System.out.println(service.queue.size());
    }
}

? ? ? ? ?

文章来源:https://blog.csdn.net/geminigoth/article/details/135196777
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。