接上一篇文章【实战】ZLMediaKit部署及使用
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.4</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>zlm-timer-task</artifactId>
<version>1.0.0</version>
<name>zlm-timer-task</name>
<description>用于定时检查zlm中的拉/推流状态,并重新拉/推流</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<hutool.version>5.7.18</hutool.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
</plugins>
</build>
</project>
# 服务端口
server:
port: 8881
# zlmediakit 配置
zlm:
zlmUrl: http://192.168.1.180:8080 # zlm服务地址
secret: qts123 # zlm鉴权秘钥
app: live
vhost: __defaultVhost__
# 定时任务配置
task:
initialDelay: 5 #初始延迟时间,单位秒
delayBetweenTasks: 10 #任务执行间隔时间,单位秒
# 需要拉流/推设备
devices: hk20,hk33
package com.example.demo.timer;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* rtsp拉流枚举
*
*/
@Getter
@AllArgsConstructor
public enum RtspEnum {
// 摄像头,根据实际情况进行定义
hk20("hk20","实际的rtsp地址","环境实验室2(3-1)"),
hk33("hk33","实际的rtsp地址","环境实验室2(3-3)"),
;
private final String stream;
private final String url;
private final String name;
public static RtspEnum getByStream(String stream){
for (RtspEnum rtspEnum : RtspEnum.values()) {
if(rtspEnum.getStream().equals(stream)){
return rtspEnum;
}
}
return null;
}
}
package com.example.demo.timer;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* rtmp推流枚举
*/
@Getter
@AllArgsConstructor
public enum RtmpEnum {
// 海康摄像头,stream字段与rtsp对应,根据实际情况进行定义
hk20("hk20","实际的rtmp地址","环境实验室2(3-1)"),
hk33("hk33","实际的rtmp地址","环境实验室2(3-3)"),
;
private final String stream;
private final String url;
private final String name;
/**
* 通过 stream获取枚举
*/
public static RtmpEnum getByStream(String stream){
for (RtmpEnum rtmpEnum : RtmpEnum.values()) {
if(rtmpEnum.getStream().equals(stream)){
return rtmpEnum;
}
}
return null;
}
}
package com.example.demo.timer;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 定时任务
*
* @author qingtongsheng
* @date 2023年03月21日 16:54
*/
@Slf4j
@Component
public class ScheduleTask {
// 需要拉/推流的设备列表
public static List<String> deviceList = new ArrayList<>();
// 设定任务执行的初始延迟时间(以秒为单位)
public static long initialDelay = 5;
// 设定任务执行的间隔时间(以秒为单位)
public static long delayBetweenTasks = 10;
// 时间单位
public static final TimeUnit TIMEUNIT = TimeUnit.SECONDS;
// @Value获取yml配置
public static String zlmUrl = "http://127.0.0.1:8080";
public static String secret = "qts123";
public static String app = "live";
public static String vhost = "__defaultVhost__";
@Value("#{'${devices}'.split(',')}")
public void setDeviceList(List<String> devices) {
ScheduleTask.deviceList = devices;
}
@Value("${task.initialDelay}")
public void setInitialDelay(long initialDelay) {
ScheduleTask.initialDelay = initialDelay;
}
@Value("${task.delayBetweenTasks}")
public void setDelayBetweenTasks(long delayBetweenTasks) {
ScheduleTask.delayBetweenTasks = delayBetweenTasks;
}
@Value("${zlm.zlmUrl}")
public void setZlmUrl(String zlmUrl) {
ScheduleTask.zlmUrl = zlmUrl;
}
@Value("${zlm.secret}")
public void setSecret(String secret) {
ScheduleTask.secret = secret;
}
@Value("${zlm.app}")
public void setApp(String app) {
ScheduleTask.app = app;
}
@Value("${zlm.vhost}")
public void setVhost(String vhost) {
ScheduleTask.vhost = vhost;
}
public static void task() {
// 创建一个 ScheduledExecutorService 对象
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
// 创建一个 Runnable 任务
Runnable task = () -> {
// 计算程序执行耗时
long startTime = System.currentTimeMillis();
// 执行任务
handleCommand();
long endTime = System.currentTimeMillis();
log.info("任务执行耗时:{} ms" , (endTime - startTime));
};
// 使用 ScheduledExecutorService 对象的 scheduleWithFixedDelay 方法执行任务
scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delayBetweenTasks, TIMEUNIT);
shutdownThread(scheduledExecutorService);
}
/**
* 调用 zlm api获取设备列表
*
* @return
*/
private static String getMediaList() {
System.out.println("zlmUrl => "+zlmUrl);
String url = zlmUrl + "/index/api/getMediaList";
HashMap<String, Object> params = new HashMap<>();
params.put("secret", secret);
params.put("schema", "rtmp");
params.put("app", app);
params.put("vhost", vhost);
return HttpUtil.get(url, params,5000);
}
/**
* 任务执行代码
*/
private static void handleCommand() {
// 获取线上设备列表
String body;
try {
body = getMediaList();
} catch (Exception e) {
// 访问zlm失败,打印错误信息,可能是服务器未启动完成
log.warn("访问zlm失败,可能是服务器未启动完成:{}", e.getMessage());
return;
}
JSONObject jsonObject = JSONUtil.parseObj(body);
Integer code = jsonObject.getInt("code");
if (code != 0) {
// 调用失败,打印错误信息
log.info("获取设备列表失败,错误信息: code = {},msg = {}", jsonObject.getInt("code"),jsonObject.getStr("msg"));
return;
}
// 调用成功
JSONArray data = jsonObject.getJSONArray("data");
// zlm中的所有设备ID列表
List<String> streamIdList = new ArrayList<>();
// 没有推流的流ID列表
Set<String> notOutputStreamIdList = new HashSet<>();
if (data != null) {
// 得到所有设备ID
streamIdList = data.stream().map(item -> ((JSONObject) item).getStr("stream")).collect(Collectors.toList());
// 得到没有推流的流ID,即过滤data得到readerCount为0的流ID
notOutputStreamIdList = data.stream()
.filter(item -> ((JSONObject) item).getInt("readerCount") == 0)
.map(item -> ((JSONObject) item).getStr("stream"))
.collect(Collectors.toSet());
}
log.info("当前设备列表: {}",streamIdList);
if (!notOutputStreamIdList.isEmpty()) {
log.info("需要重新推流的流ID列表: {}", notOutputStreamIdList);
// 将未推流的流ID进行推流处理
for (String streamId : notOutputStreamIdList) {
// 推流处理
RtmpEnum rtmpEnum = RtmpEnum.getByStream(streamId);
String pushData = pushStream(rtmpEnum);
JSONObject pushObject = JSONUtil.parseObj(pushData);
Integer pushCode = pushObject.getInt("code");
if (pushCode != 0) {
log.warn("推流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtmpEnum.getName(), pushCode,pushObject.getStr("msg"));
} else {
log.info("推流成功: 设备名称 = 【{}】", rtmpEnum.getName());
}
}
log.info("重新推流完成");
}
if (streamIdList.size() >= deviceList.size()) {
log.info("设备列表个数正常, 不需要重新拉流");
return;
}
log.info("设备列表缺失,开始重新拉流/推流");
// 需要对缺失的进行重新拉,并重新推流
for (String stream : deviceList) {
// 将不在设备列表中的设备,重新拉流,并推流
if (!streamIdList.contains(stream)) {
RtspEnum rtspEnum = RtspEnum.getByStream(stream);
if (rtspEnum == null) {
continue;
}
try {
// 拉流
String pullData = pullStream(rtspEnum);
JSONObject pullObject = JSONUtil.parseObj(pullData);
Integer pushCode = pullObject.getInt("code");
if (pushCode != 0) {
log.warn("重新拉流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtspEnum.getName(), pushCode,pullObject.getStr("msg"));
} else {
log.info("重新拉流完成: 设备名称 = 【{}】", rtspEnum.getName());
// 等待拉流完成
Thread.sleep(2000);
}
} catch (Exception e) {
// 重新拉流失败
log.error("重新拉流失败:{} , 错误信息 {}", rtspEnum.getName(),e.getMessage());
}
try {
// 推流
String pushData = pushStream(RtmpEnum.getByStream(rtspEnum.getStream()));
JSONObject pushObject = JSONUtil.parseObj(pushData);
Integer pushCode = pushObject.getInt("code");
if (pushCode != 0) {
log.warn("重新推流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtspEnum.getName(), pushCode,pushObject.getStr("msg"));
} else {
log.info("重新推流完成: 设备名称 = 【{}】", rtspEnum.getName());
// 等待推流完成
Thread.sleep(2000);
}
} catch (Exception e) {
// 重新推流失败
log.error("重新推流失败:{}, 错误信息 {}", rtspEnum.getName(),e.getMessage());
}
}
}
log.info("结束重新拉流/推流");
}
/**
* 调用 zlm api 拉流
*
* @param rtspEnum 拉流地址枚举
*/
private static String pullStream(RtspEnum rtspEnum) {
String url = zlmUrl + "/index/api/addStreamProxy";
HashMap<String, Object> params = new HashMap<>();
params.put("secret", secret);
params.put("vhost", vhost);
params.put("app", app);
params.put("stream", rtspEnum.getStream());
params.put("url", rtspEnum.getUrl());
return HttpUtil.get(url, params);
}
/**
* 调用 zlm api 推流
*/
private static String pushStream(RtmpEnum rtmpEnum) {
if (rtmpEnum == null) {
throw new RuntimeException("推流地址不能为空");
}
String url = zlmUrl + "/index/api/addStreamPusherProxy";
HashMap<String, Object> params = new HashMap<>();
params.put("secret", secret);
params.put("schema", "rtmp");
params.put("vhost", vhost);
params.put("app", app);
params.put("stream", rtmpEnum.getStream());
params.put("dst_url", rtmpEnum.getUrl());
return HttpUtil.get(url, params);
}
/**
* 程序结束时关闭线程池
*
* @param scheduledExecutorService
*/
private static void shutdownThread(ScheduledExecutorService scheduledExecutorService) {
// 关闭 ScheduledExecutorService 对象
Thread shutdownThread = new Thread(() -> {
System.out.println("正在关闭定时任务线程...");
scheduledExecutorService.shutdown();
System.out.println("定时任务线程已经关闭...");
System.exit(0);
});
// 进程结束的钩子
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}
package com.example.demo;
import com.example.demo.timer.ScheduleTask;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
// 调用定时任务
ScheduleTask.task();
System.out.println("定时任务已启动");
}
}
Dockerfile
#设置镜像基础: jdk8-jre , 比jdk内存小
FROM java:openjdk-8u111-jre-alpine
#维护人员信息
MAINTAINER qingtongsheng
#设置对外暴露端口 多个端口空格分割
EXPOSE 8881
# VOLUME /tmp
# 复制jar到容器中,并重命名
ADD zlm-timer-task-1.0.0.jar app.jar
# 复制配置文件到容器中
ADD config/application.yml config/application.yml
#执行启动命令
ENTRYPOINT ["java","-Dserver.port=8881","-Dspring.config.location=/config/application.yml","-jar","/app.jar"]
2.1 将jar包 和 Dockerfile 上传到linux的 /home/docker/timer 路径下
2.2 将application.yml配置文件放到 /home/docker/timer/config 路径下
命令执行: docker pull java:openjdk-8u111-jre-alpine
4.1 进入/home/docker/timer目录 cd /home/docker/timer
4.2 打镜像 docker build -t zlm-timer . (注意后面的点.需要带上)
5.0 docker自启动 systemctl enable docker.service
5.1 docker run -it -d --name timer --restart=always -p 8881:8881 -v /home/docker/timer/config:/config zlm-timer /bin/bash
6.1 查看容器控制台日志,看最后20行,一直输出 docker logs -f --tail=20 timer
7.1 修改配置: /home/docker/timer/config中的application.yml
7.2 再重启容器 docker restart timer
修改容器自动重启 docker update --restart=always timer