1、对输入输出流、字符字节流的学习,以之前做的批量下载功能为例
批量下载指的是,将多个文件打包到zip文件中,然后下载该zip文件。
1.1下载网络上的文件
代码参考如下:
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class MultiDownloadByUrlTest {
public static void main(String[] args) throws IOException {
//如果zip文件不存在则创建zip
String localZipFile = "D:/temp/testUrl.zip" ;
File file = new File(localZipFile);
if(!file.exists()){
file.createNewFile();
}
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(localZipFile));
byte[] buffer = new byte[1024];
//要批量下载的文件数组
String[] urls = new String[] {"http://www.baidu.com/img/PCfb_5bf082d29588c07f842ccde3f97243ea.png",
"https://img-home.csdnimg.cn/images/20201124032511.png"};
//依次获取批量下载的文件
for(int i =0; i<urls.length;i++){
//从数据库中获取文件的路径和文件名,并放入zip文件中
String urlFile = urls[i];
out.putNextEntry(new ZipEntry(i+".png"));
int len;
URL url = new URL(urlFile);
URLConnection conn = url.openConnection();
InputStream inStream = conn.getInputStream();
//读入需要下载的文件的内容,打包到zip文件
while ((len = inStream.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.closeEntry();
inStream.close();
}
out.close();
}
}
1.2、下载磁盘中的多个文件到zip文件中
import java.io.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class MultiDownloadTest {
public static void main(String[] args) throws IOException {
//如果zip文件不存在则创建zip
String localZipFile = "D:/temp/test.zip" ;
File file = new File(localZipFile);
if(!file.exists()){
file.createNewFile();
}
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(localZipFile));
//要批量下载的文件数组
String[] ids = new String[] {"11.docx","22.xlsx"};
byte[] buffer = new byte[1024];
//依次获取批量下载的文件
for(int i =0; i<ids.length;i++){
String fileName = ids[i];
out.putNextEntry(new ZipEntry(fileName));
int len;
FileInputStream inStream = new FileInputStream(new File("D:/temp/"+fileName));
//读入需要下载的文件的内容,打包到zip文件
while ((len = inStream.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.closeEntry();
inStream.close();
}
out.close();
}
}
1.3编写接口,下载该zip文件
Controller类代码如下:
package com.hmblogs.backend.controller;
import com.hmblogs.backend.util.MultiDownloadTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
@RestController
@Slf4j
public class FileController {
/**
* get方式下载zip文件
* @return
*/
@GetMapping(value = "/downloadZipByGet")
public void downloadZipByGet(HttpServletRequest request,
HttpServletResponse response) throws IOException {
log.info("downloadZipByGet prepare begin.");
MultiDownloadTest.preMultiDownload();
log.info("downloadZipByGet prepare end.");
log.info("downloadZipByGet begin.");
String zipFileName = "D:/temp/test.zip";
InputStream is = new FileInputStream(new File(zipFileName));
// 设置response参数,可以打开下载页面
response.reset();
response.setContentType("application/octet-stream;charset=utf-8");
response.setHeader("Access-Control-Expose-Headers", "content-disposition");
response.setHeader("Content-Disposition", "attachment;filename=test.zip");
ServletOutputStream out = response.getOutputStream();
BufferedInputStream bis = null;
BufferedOutputStream bos = null;
try {
bis = new BufferedInputStream(is);
bos = new BufferedOutputStream(out);
byte[] buff = new byte[2048];
int bytesRead;
while (-1 != (bytesRead = bis.read(buff, 0, buff.length))) {
bos.write(buff, 0, bytesRead);
}
log.info("downloadZipByGet end.");
} catch (final IOException e) {
log.error("downloadZipByGet errors, reason:{}", e.getMessage());
} finally {
if (bis != null){
bis.close();
}
if (is != null){
is.close();
}
if (bos != null){
bos.close();
}
if (out != null){
out.close();
}
}
// 用后删除临时用途的zip文件
File fileTempZip = new File(zipFileName);
if(fileTempZip.exists()){
fileTempZip.delete();
}
}
/**
* post方式下载zip文件
* @return
*/
@RequestMapping(value = "/downloadZipByPost",method = RequestMethod.POST)
public void downloadZipByPost(){
}
}
工具类MultiDownloadTest.java代码如下:
package com.hmblogs.backend.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class MultiDownloadTest {
public static void preMultiDownload() throws IOException {
//如果zip文件不存在则创建zip
String localZipFile = "D:/temp/test.zip" ;
File file = new File(localZipFile);
if(!file.exists()){
file.createNewFile();
}
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(localZipFile));
//要批量下载的文件数组
String[] ids = new String[] {"11.docx","22.xlsx"};
byte[] buffer = new byte[1024];
//依次获取批量下载的文件
for(int i =0; i<ids.length;i++){
String fileName = ids[i];
out.putNextEntry(new ZipEntry(fileName));
int len;
FileInputStream inStream = new FileInputStream(new File("D:/temp/"+fileName));
//读入需要下载的文件的内容,打包到zip文件
while ((len = inStream.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.closeEntry();
inStream.close();
}
out.close();
}
}
1.4访问接口
http://localhost:8080/backend/downloadZipByGet
下载了文件,能正常打开,且文件都是正常的。
二、学习NIO
2.1开发一个简单的服务端接收客户端发过来的消息的功能
服务端Server.java代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class Server {
public static void main(String[] args) {
try {
//1.获取管道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2.设置非阻塞模式
serverSocketChannel.configureBlocking(false);
//3.绑定端口
serverSocketChannel.bind(new InetSocketAddress(8888));
//4.获取选择器
Selector selector = Selector.open();
//5.将通道注册到选择器上,并且开始指定监听的接收事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//6.轮询已经就绪的事件
while (selector.select() > 0){
System.out.println("开启事件处理");
//7.获取选择器中所有注册的通道中已准备好的事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//8.开始遍历事件
while (it.hasNext()){
SelectionKey selectionKey = it.next();
//9.判断这个事件具体是啥
if (selectionKey.isAcceptable()){
System.out.println("isAcceptable--->"+selectionKey);
//10.获取当前接入事件的客户端通道
SocketChannel socketChannel = serverSocketChannel.accept();
//11.切换成非阻塞模式
socketChannel.configureBlocking(false);
//12.将本客户端注册到选择器
socketChannel.register(selector,SelectionKey.OP_READ);
}else if (selectionKey.isReadable()){
System.out.println("isReadable--->"+selectionKey);
//13.获取当前选择器上的读
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//14.读取
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len;
while ((len = socketChannel.read(buffer)) > 0){
buffer.flip();
System.out.println(new String(buffer.array(),0,len));
//清除之前的数据(覆盖写入)
buffer.clear();
}
}
//15.处理完毕后,移除当前事件
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端Client.java代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8888));
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.print("请输入:");
String msg = scanner.nextLine();
buffer.put(msg.getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行Server
然后运行Client
在Client输入wwww
然后看到Server打印如下内容:
开启事件处理
isAcceptable--->sun.nio.ch.SelectionKeyImpl@42110406
开启事件处理
isReadable--->sun.nio.ch.SelectionKeyImpl@531d72ca
wwww
可以看到, 这时先开启了isAcceptable事件处理,然后开启了isReadable事件处理。
然后再在Client输入qqqq
然后看到Server打印如下内容:
开启事件处理
isReadable--->sun.nio.ch.SelectionKeyImpl@531d72ca
qqqq
这时只开启了isReadable这样的事件处理,不用再开启isAcceptable事件处理
server控制台截图如下
client的控制台如下:?
2.2开发网络编程应用实例-群聊系统
需求:进一步理解 NIO 非阻塞网络编程机制,实现多人群聊
?编写一个 NIO 群聊系统,实现客户端与客户端的通信需求(非阻塞)
服务器端:可以监测用户上线,离线,并实现消息转发功能
客户端:通过 channel 可以无阻塞发送消息给其它所有客户端用户,同时可以接受其它客户端用户通过服务端转发来的消息
Server.java代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
*
*/
public class Server {
//定义属性
private Selector selector;
private ServerSocketChannel ssChannel;
private static final int PORT = 9999;
//构造器
//初始化工作
public Server() {
try {
// 1、获取通道
ssChannel = ServerSocketChannel.open();
// 2、切换为非阻塞模式
ssChannel.configureBlocking(false);
// 3、绑定连接的端口
ssChannel.bind(new InetSocketAddress(PORT));
// 4、获取选择器Selector
selector = Selector.open();
// 5、将通道都注册到选择器上去,并且开始指定监听接收事件
ssChannel.register(selector , SelectionKey.OP_ACCEPT);
}catch (IOException e) {
e.printStackTrace();
}
}
//监听
public void listen() {
System.out.println("监听线程:" + Thread.currentThread().getName());
try {
while (selector.select() > 0){
// 7、获取选择器中的所有注册的通道中已经就绪好的事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// 8、开始遍历这些准备好的事件
while (it.hasNext()){
// 提取当前这个事件
SelectionKey sk = it.next();
// 9、判断这个事件具体是什么
if(sk.isAcceptable()){
// 10、直接获取当前接入的客户端通道
SocketChannel schannel = ssChannel.accept();
// 11 、切换成非阻塞模式
schannel.configureBlocking(false);
// 12、将本客户端通道注册到选择器
System.out.println(schannel.getRemoteAddress() + " 上线 ");
schannel.register(selector , SelectionKey.OP_READ);
//提示
}else if(sk.isReadable()){
//处理读 (专门写方法..)
readData(sk);
}
it.remove(); // 处理完毕之后需要移除当前事件
}
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//发生异常处理....
}
}
//读取客户端消息
private void readData(SelectionKey key) {
//获取关联的channel
SocketChannel channel = null;
try {
//得到channel
channel = (SocketChannel) key.channel();
//创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
//根据count的值做处理
if(count > 0) {
//把缓存区的数据转成字符串
String msg = new String(buffer.array());
//输出该消息
System.out.println("来自客户端---> " + msg);
//向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
sendInfoToOtherClients(msg, channel);
}
}catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 离线了..");
e.printStackTrace();
//取消注册
key.cancel();
//关闭通道
channel.close();
}catch (IOException e2) {
e2.printStackTrace();;
}
}
}
//转发消息给其它客户(通道)
private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException{
System.out.println("服务器转发消息中...");
System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName());
//遍历 所有注册到selector 上的 SocketChannel,并排除 self
for(SelectionKey key: selector.keys()) {
//通过 key 取出对应的 SocketChannel
Channel targetChannel = key.channel();
//排除自己
if(targetChannel instanceof SocketChannel && targetChannel != self) {
//转型
SocketChannel dest = (SocketChannel)targetChannel;
//将msg 存储到buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//将buffer 的数据写入 通道
dest.write(buffer);
}
}
}
public static void main(String[] args) {
//创建服务器对象
Server groupChatServer = new Server();
groupChatServer.listen();
}
}
Client.java代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class Client {
//定义相关的属性
private final String HOST = "127.0.0.1"; // 服务器的ip
private final int PORT = 9999; //服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String username;
//构造器, 完成初始化工作
public Client() throws IOException {
selector = Selector.open();
//连接服务器
socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将channel 注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到username
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + " is ok...");
}
//向服务器发送消息
public void sendInfo(String info) {
info = username + " 说:" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
}catch (IOException e) {
e.printStackTrace();
}
}
//读取从服务器端回复的消息
public void readInfo() {
try {
int readChannels = selector.select();
if(readChannels > 0) {//有可以用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isReadable()) {
//得到相关的通道
SocketChannel sc = (SocketChannel) key.channel();
//得到一个Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
sc.read(buffer);
//把读到的缓冲区的数据转成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
}
iterator.remove(); //删除当前的selectionKey, 防止重复操作
} else {
//System.out.println("没有可以用的通道...");
}
}catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//启动我们客户端
Client chatClient = new Client();
//启动一个线程, 每个3秒,读取从服务器发送数据
new Thread() {
public void run() {
while (true) {
chatClient.readInfo();
try {
Thread.currentThread().sleep(3000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
//发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
System.out.print("请输入:");
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
Client22.java代码如下:就是Client.java的代码复制一份,改下文件名而已。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class Client22 {
//定义相关的属性
private final String HOST = "127.0.0.1"; // 服务器的ip
private final int PORT = 9999; //服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String username;
//构造器, 完成初始化工作
public Client22() throws IOException {
selector = Selector.open();
//连接服务器
socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将channel 注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到username
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + " is ok...");
}
//向服务器发送消息
public void sendInfo(String info) {
info = username + " 说:" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
}catch (IOException e) {
e.printStackTrace();
}
}
//读取从服务器端回复的消息
public void readInfo() {
try {
int readChannels = selector.select();
if(readChannels > 0) {//有可以用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isReadable()) {
//得到相关的通道
SocketChannel sc = (SocketChannel) key.channel();
//得到一个Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
sc.read(buffer);
//把读到的缓冲区的数据转成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
}
iterator.remove(); //删除当前的selectionKey, 防止重复操作
} else {
//System.out.println("没有可以用的通道...");
}
}catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//启动我们客户端
Client22 chatClient = new Client22();
//启动一个线程, 每个3秒,读取从服务器发送数据
new Thread() {
public void run() {
while (true) {
chatClient.readInfo();
try {
Thread.currentThread().sleep(3000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
//发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
System.out.print("请输入:");
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
启动Server、Client、Client22后,在Client输入内容、在Client22输入内容后,对3个控制台截图如下
Server控制台截图如下:
Client控制台如下:
Client22控制台如下:
?
简单的需求目的达到了。?