目录
????????创建1个生产者(写入程序)1个消费者(读取程序)和1个buff中转,以及1个模拟文本文件。
????????生产者将文件内容读取后写入到buffer中转中(生产),消费者将buffer内容读取后打印(消费)。
??????? 当buffer满时,生产者进入等待状态,当buffer为空时,消费者进入等待状态。
创建1个生产者和3个消费者。
package xyz.jangle.thread.test.n2_6.condition;
import java.util.concurrent.TimeUnit;
/**
*
* 2.6 在一个锁中使用2种条件
* 生产者,即写入程序
* 消费者,即读取程序
* 缓冲,生产者和消费者的数据中转。
*
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年7月26日 上午7:39:31
*
*/
public class M {
// 写进程的数量
private static int wn = 1;
// 读进程的数量
private static int rn = 3;
public static void main(String[] args) throws Exception {
FileMock file = new FileMock(20, 5);
Buffer buf = new Buffer(3);
var sTime = System.currentTimeMillis();
System.out.println(sTime);
// 写入线程
Thread[] wThreads = new Thread[wn];
for (int i = 0; i < wn; i++) {
Thread thread = new Thread(() -> {
buf.setPendingLines(true);
while (file.hasMoreLine()) {
String line = file.getLine();
if(line != null) //如果写进程为2个,则需要加判断 2024年1月5日 09:16:30
buf.insert(line);
}
buf.setPendingLines(false);
});
thread.start();
wThreads[i] = thread;
}
Thread[] rThreads = new Thread[rn];
for (int i = 0; i < rn; i++) {
Thread thread = new Thread(() -> {
while (buf.hasPendingLines()) {
buf.get();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
rThreads[i] = thread;
}
for(Thread w:wThreads) {
w.join();
}
for(Thread r:rThreads) {
r.join();
}
System.out.println(System.currentTimeMillis() - sTime);
}
}
package xyz.jangle.thread.test.n2_6.condition;
/**
*
* 模拟文本文件
*
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年7月25日 下午10:04:30
*
*/
public class FileMock {
// 正文((行)
private String[] content;
// 行
private int index;
public FileMock(int size, int length) {
content = new String[size];
for (int i = 0; i < size; i++) {
StringBuilder row = new StringBuilder(length);
for (int j = 0; j < length; j++) {
int rChar = (int) (Math.random() * 255);
row.append(rChar);
}
content[i] = row.toString();
}
index = 0;
}
/**
* 是否还有行(即末尾了没)
*
* @author jangle
* @time 2020年7月25日 下午10:09:52
* @return
*/
public boolean hasMoreLine() {
return index < content.length;
}
/**
* 获取行
*
* @author jangle
* @time 2020年7月25日 下午10:12:02
* @return
*/
public synchronized String getLine() {//如果写进程为2个,则需要加上同步。2024年1月5日 09:16:07
if (this.hasMoreLine()) {
System.out.println("Mock:" + (content.length - index));
return content[index++];
}
return null;
}
}
package xyz.jangle.thread.test.n2_6.condition;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 数据缓冲区,建立在生产者和消费者之间。
*
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年7月25日 下午10:12:59
*
*/
public class Buffer {
// 存放共享数据
private final LinkedList<String> buffer;
// 缓冲区长度
private final int maxSize;
private final ReentrantLock lock;
private final Condition insertCondition;
private final Condition getCondition;
private boolean pendingLines;
public Buffer(int maxSize) {
super();
this.maxSize = maxSize;
this.lock = new ReentrantLock();
insertCondition = lock.newCondition();
getCondition = lock.newCondition();
pendingLines = true;
buffer = new LinkedList<String>();
}
// 插入缓冲数据
public void insert(String line) {
lock.lock();
try {
while (buffer.size() == maxSize) {
System.out.println(Thread.currentThread().getName()+"缓冲区已满,进入等待");
insertCondition.await();
}
buffer.offer(line);
System.out.println(Thread.currentThread().getName()+"插入了一行记录"+line);
getCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
// 获取缓冲数据
public String get() {
String line = null;
lock.lock();
try {
// 当缓冲区没有内容,但文本文件中还有未写入的内容时,进行等待。
while((buffer.size() == 0) && (hasPendingLines())) {
System.out.println(Thread.currentThread().getName()+"缓冲区已空,进入等待");
getCondition.await();
}
if(hasPendingLines()) {
line = buffer.poll();
System.out.println(Thread.currentThread().getName()+"读取了一行数据"+line+"缓冲区还剩:"+buffer.size());
insertCondition.signalAll();
}
} catch (Exception e) {
}finally {
lock.unlock();
}
return line;
}
/**
* 用于设置是否还有文本内容(在文本文件内容全部写入缓冲后设置为false)
*
* @author jangle
* @time 2020年7月26日 上午7:59:13
* @param pendingLines
*/
public synchronized void setPendingLines(boolean pendingLines) {
this.pendingLines = pendingLines;
}
/**
* 是否还有未处理的文本内容(文本文件内容 || 缓冲文件内容)
*
* @author jangle
* @time 2020年7月26日 上午7:58:32
* @return
*/
public synchronized boolean hasPendingLines() {
return pendingLines || buffer.size() > 0;
}
}
1704423262311
Mock:20
Thread-0插入了一行记录201246130116
Mock:19
Thread-0插入了一行记录131291943819
Mock:18
Thread-3读取了一行数据201246130116缓冲区还剩:1
Thread-0插入了一行记录21723025351228
Mock:17
Thread-1读取了一行数据131291943819缓冲区还剩:1
Thread-2读取了一行数据21723025351228缓冲区还剩:0
Thread-0插入了一行记录16195135235153
Mock:16
Thread-0插入了一行记录214134140210184
Mock:15
Thread-0插入了一行记录1931514111087
Mock:14
Thread-0缓冲区已满,进入等待
Thread-1读取了一行数据16195135235153缓冲区还剩:2
Thread-2读取了一行数据214134140210184缓冲区还剩:1
Thread-3读取了一行数据1931514111087缓冲区还剩:0
Thread-0插入了一行记录861591085822
Mock:13
Thread-0插入了一行记录1487796124227
Mock:12
Thread-0插入了一行记录21624569179235
Mock:11
Thread-0缓冲区已满,进入等待
Thread-1读取了一行数据861591085822缓冲区还剩:2
Thread-2读取了一行数据1487796124227缓冲区还剩:1
Thread-3读取了一行数据21624569179235缓冲区还剩:0
Thread-0插入了一行记录6219799204240
Mock:10
Thread-0插入了一行记录936822910658
Mock:9
Thread-0插入了一行记录201565732128
Mock:8
Thread-0缓冲区已满,进入等待
Thread-2读取了一行数据6219799204240缓冲区还剩:2
Thread-3读取了一行数据936822910658缓冲区还剩:1
Thread-1读取了一行数据201565732128缓冲区还剩:0
Thread-0插入了一行记录1996321252179
Mock:7
Thread-0插入了一行记录21313993322
Mock:6
Thread-0插入了一行记录66215619465
Mock:5
Thread-0缓冲区已满,进入等待
Thread-3读取了一行数据1996321252179缓冲区还剩:2
Thread-1读取了一行数据21313993322缓冲区还剩:1
Thread-2读取了一行数据66215619465缓冲区还剩:0
Thread-0插入了一行记录201215617117
Mock:4
Thread-0插入了一行记录1361563315546
Mock:3
Thread-0插入了一行记录23422921188193
Mock:2
Thread-0缓冲区已满,进入等待
Thread-1读取了一行数据201215617117缓冲区还剩:2
Thread-2读取了一行数据1361563315546缓冲区还剩:1
Thread-3读取了一行数据23422921188193缓冲区还剩:0
Thread-0插入了一行记录2063187172139
Mock:1
Thread-0插入了一行记录11180225171
Thread-3读取了一行数据2063187172139缓冲区还剩:1
Thread-1读取了一行数据11180225171缓冲区还剩:0
7053