多线程实践项目

发布时间:2024年01月04日

前言

前面几篇文章分别学习了多线程的基本知识和线程池使用,这篇则为项目实践和整理。

项目参考

选择了两个项目github地址,如果不方便下载可以下面留言评论私发。
1.马士兵老师的juc,讲述了多线程的基本知识线程讲解
2.基本的线程演示:主要是对前面几篇讲解的回顾。
在这里插入图片描述

代码展示

BlockingQueue(阻塞队列)

package com.unicss;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MyBlockingQueue extends Thread {
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
private int index;
public MyBlockingQueue(int i) {
   this.index = i;
}
public void run() {
   try {
    queue.put(String.valueOf(this.index));
    System.out.println("{" + this.index + "} in queue!");
   } catch (Exception e) {
    e.printStackTrace();
   }
}
public static void main(String args[]) {
   ExecutorService service = Executors.newCachedThreadPool();
   for (int i = 0; i < 10; i++) {
    service.submit(new MyBlockingQueue(i));
   }
   Thread thread = new Thread() {
    public void run() {
     try {
      while (true) {
       Thread.sleep((int) (Math.random() * 1000));
       System.out.println("======="+MyBlockingQueue.queue.size());
       if(MyBlockingQueue.queue.isEmpty())
        break;
       String str = MyBlockingQueue.queue.take();
       System.out.println(str + " has take!");
      }
     } catch (Exception e) {
      e.printStackTrace();
     }
    }
   };
   service.submit(thread);
   service.shutdown();
}
}

CompletionService(并发工具类,获取接口放回)

package com.unicss;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyCompletionService implements Callable<String> {
private int id;

public MyCompletionService(int i){
   this.id=i;
}
public static void main(String[] args) throws Exception{
   ExecutorService service=Executors.newCachedThreadPool();
   CompletionService<String> completion=new ExecutorCompletionService<String>(service);
   for(int i=0;i<10;i++){
    completion.submit(new MyCompletionService(i));
   }
   for(int i=0;i<10;i++){
    System.out.println(completion.take().get());
   }
   service.shutdown();
}

@Override
public String call() throws Exception {
	   Integer time=(int)(Math.random()*1000);
	   try{
	    System.out.println(this.id+" start");
	    Thread.sleep(time);
	    System.out.println(this.id+" end");
	   }
	   catch(Exception e){
	    e.printStackTrace();
	   }
	   return this.id+":"+time;
	}
}

Executor

package com.unicss;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyExecutor extends Thread {
private int index;
public MyExecutor(int i){
    this.index=i;
}
public void run(){
    try{
     System.out.println("["+this.index+"] start....");
     Thread.sleep((int)(Math.random()*10000));
     System.out.println("["+this.index+"] end.");
    }
    catch(Exception e){
     e.printStackTrace();
    }
}
public static void main(String args[]){
    ExecutorService service=Executors.newFixedThreadPool(4);
    for(int i=0;i<10;i++){
     service.execute(new MyExecutor(i));
     //service.submit(new MyExecutor(i));
    }
    System.out.println("submit finish");
    service.shutdown();
}
}

ReentrantLock(可重入到互斥锁)

package com.unicss;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
public class MyReentrantLock extends Thread{
TestReentrantLock lock;
private int id;
public MyReentrantLock(int i,TestReentrantLock test){
    this.id=i;
    this.lock=test;
}
public void run(){
    lock.print(id);
}
public static void main(String args[]){
    ExecutorService service=Executors.newCachedThreadPool();
    TestReentrantLock lock=new TestReentrantLock();
    for(int i=0;i<10;i++){
     service.submit(new MyReentrantLock(i,lock));
    }
    service.shutdown();
}
}
class TestReentrantLock{
private ReentrantLock lock=new ReentrantLock();
public void print(int str){
    try{
     lock.lock();
     System.out.println(str+"获得");
     Thread.sleep((int)(Math.random()*1000));
    }
    catch(Exception e){
     e.printStackTrace();
    }
    finally{
     System.out.println(str+"释放");
     lock.unlock();
    }
}
}

Semaphore(信号量)

package com.unicss;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class MySemaphore extends Thread {
Semaphore position;
private int id;
public MySemaphore(int i,Semaphore s){
    this.id=i;
    this.position=s;
}
public void run(){
    try{
     if(position.availablePermits()>0){
      System.out.println("顾客["+this.id+"]进入厕所,有空位");
     }
     else{
      System.out.println("顾客["+this.id+"]进入厕所,没空位,排队");
     }
     position.acquire();
     System.out.println("顾客["+this.id+"]获得坑位");
     Thread.sleep((int)(Math.random()*1000));
     System.out.println("顾客["+this.id+"]使用完毕");
     position.release();
    }
    catch(Exception e){
     e.printStackTrace();
    }
}
public static void main(String args[]){
    ExecutorService list=Executors.newCachedThreadPool();
    Semaphore position=new Semaphore(2);
    for(int i=0;i<10;i++){
     list.submit(new MySemaphore(i+1,position));
    }
    list.shutdown();
    position.acquireUninterruptibly(2);
    System.out.println("使用完毕,需要清扫了");
    position.release(2);
}
}

CountDownLatch(倒计数器)

package com.unicss;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
   // 开始的倒数锁
   final CountDownLatch begin = new CountDownLatch(1);
   // 结束的倒数锁
   final CountDownLatch end = new CountDownLatch(10);
   // 十名选手
   final ExecutorService exec = Executors.newFixedThreadPool(10);
  
   for (int index = 0; index < 10; index++) {
    final int NO = index + 1;
    Runnable run = new Runnable() {
     public void run() {
      try {
       begin.await();//一直阻塞
       Thread.sleep((long) (Math.random() * 10000));
       System.out.println("No." + NO + " arrived");
      } catch (InterruptedException e) {
      } finally {
       end.countDown();
      }
     }
    };
    exec.submit(run);
   }
   System.out.println("Game Start");
   begin.countDown(); //让技术器变为0执行

   end.await();//等待阻塞,计数器变为0 执行
   System.out.println("Game Over");
   exec.shutdown();
}
}

CyclicBarrier(同步屏障)

package com.unicss;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
  // 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
  private static int[] timeWalk = { 5, 8, 15, 15, 10 };
  // 自驾游
  private static int[] timeSelf = { 1, 3, 4, 4, 5 };
  // 旅游大巴
  private static int[] timeBus = { 2, 4, 6, 6, 7 };
  
  static String now() {
     SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
     return sdf.format(new Date()) + ": ";
  }
  static class Tour implements Runnable {
     private int[] times;
     private CyclicBarrier barrier;
     private String tourName;
     public Tour(CyclicBarrier barrier, String tourName, int[] times) {
       this.times = times;
       this.tourName = tourName;
       this.barrier = barrier;
     }
     public void run() {
       try {
         Thread.sleep(times[0] * 1000);
         System.out.println(now() + tourName + " Reached Shenzhen");
         barrier.await();
         Thread.sleep(times[1] * 1000);
         System.out.println(now() + tourName + " Reached Guangzhou");
         barrier.await();
         Thread.sleep(times[2] * 1000);
         System.out.println(now() + tourName + " Reached Shaoguan");
         barrier.await();
         Thread.sleep(times[3] * 1000);
         System.out.println(now() + tourName + " Reached Changsha");
         barrier.await();
         Thread.sleep(times[4] * 1000);
         System.out.println(now() + tourName + " Reached Wuhan");
         barrier.await();
       } catch (InterruptedException e) {
       } catch (BrokenBarrierException e) {
       }
     }
  }
  public static void main(String[] args) {
     // 三个旅行团。 屏障为三个,必须三个到达屏障才会打开
     CyclicBarrier barrier = new CyclicBarrier(3);
     ExecutorService exec = Executors.newFixedThreadPool(3);
     exec.submit(new Tour(barrier, "WalkTour", timeWalk));
     exec.submit(new Tour(barrier, "SelfTour", timeSelf));
//当我们把下面的这段代码注释后,会发现,程序阻塞了,无法继续运行下去。
     exec.submit(new Tour(barrier, "BusTour", timeBus));
    //  exec.submit(new Tour(barrier, "liu", timeBus)); 如果单独加一个会阻塞在屏障上,必须根据设置是三个
     exec.shutdown();
  }
} 

ScheduledThread(定时线程)

package com.unicss;

/**
 * 定时器
 */
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
public class TestScheduledThread {
public static void main(String[] args) {

   final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
   final Runnable beeper = new Runnable() {
	    int count = 0;
	    public void run() {
	     System.out.println(new Date() + " beep " + (++count));
	    }
};
   // 1秒钟后运行,并每隔2秒运行一次
   final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
   // 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行
   final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
   // 30秒后结束关闭任务,并且关闭Scheduler
   scheduler.schedule(new Runnable() {
    public void run() {
     beeperHandle.cancel(true);
     beeperHandle2.cancel(true);
     scheduler.shutdown();
    }
   }, 30, SECONDS);
}
}
文章来源:https://blog.csdn.net/weixin_45003206/article/details/135379855
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。