现场运行过程中,出现节点卡死,导致了任务流阻塞,
但是我们点击了任务流实例的停止按钮,woker节点并没有停止,任务仍旧继续卡主
1、单节点点击停止可以立即停止任务
2、任务流实例点击停止,可以让任务流以下的所有任务节点都停止掉
1、找到woker任务节点,具体的执行方法
2、把这个方法用线程的方式执行
3、每次执行一个任务方法,就把这个线程存下来
4、通信方式socket,实现不同服务器之间的操作
5、找到这个线程,然后干掉线程
TaskExecuteThread类 run()方法,里面的 task.handle(); 这一步就是每一个节点执行任务的那一步
把具体的 task.handle(); 方法放进线程里面
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// 提交任务
Future<Void> future = executor.submit(() -> {
// 在这里执行你的任务
// 如果任务在10秒内没有完成,将抛出TimeoutException
// task handle
task.handle();
return null;
});
activeTaskNode.put(String.valueOf(taskExecutionContext.getTaskInstanceId()),future);
// 等待任务完成,这里会阻塞当前线程
future.get(1*60*60*24, TimeUnit.SECONDS);
}catch (TimeoutException e) {
// 在超时时执行的代码
logger.error("任务执行超时,将终止程序。");
//System.out.println("任务执行超时,将终止程序。");
activeTaskNode.remove(String.valueOf(taskExecutionContext.getTaskInstanceId()));
}catch (Exception e) {
e.printStackTrace();
System.out.println("异常的情况下关闭线程!!!!!!!!!!!!!!!!!!");
} finally {
System.out.println("正常关闭线程!!!!!!!!!!!!!!!!!!");
// 关闭线程池
executor.shutdown();
}
public static final Map<String, Future<Void>> activeTaskNode = ActiveTaskNode.activeTaskNode;
activeTaskNode.put(String.valueOf(taskExecutionContext.getTaskInstanceId()),future);
import java.io.Serializable;
/**
* <p>Project: demo - MyObject
* <p>Powered by WX On 2024-01-17 20:39:20
*
* @author WX [wx@qq.com]
* @version 1.0
* @since 17
*/
public class ScocketObject implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public ScocketObject(String message) {
this.message = message;
}
public String getMessage(){
return this.message;
}
@Override
public String toString() {
return "MyObject{" +
"message='" + message + '\'' +
'}';
}
}
SocketServer工具类,这个服务需要启动起来
/**
* scoket 服务端
* <p>Project: demo - RemoteServer
* <p>Powered by WX On 2024-01-17 19:46:33
*
* @author WX [wx@qq.com]
* @version 1.0
* @since 17
*/
import org.apache.dolphinscheduler.common.thread.socket.ActiveTaskNode;
import org.apache.dolphinscheduler.common.thread.socket.ScocketObject;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Future;
public class SocketServer {
public static void startServer(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
System.out.println("Server is running on port " + port);
while (true) {
Socket clientSocket = serverSocket.accept();
handleClientRequest(clientSocket);
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void handleClientRequest(Socket clientSocket) {
try (ObjectInputStream in = new ObjectInputStream(clientSocket.getInputStream())) {
Object request = in.readObject();
if (request instanceof ScocketObject) {
String threadId = ((ScocketObject) request).getMessage();
stopThread(threadId);
} else {
System.out.println("Invalid request");
}
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private static void stopThread(String threadId) {
Future<Void> future = ActiveTaskNode.activeTaskNode.get(threadId);
if (future != null) {
boolean cancelResult = future.cancel(true);
if (cancelResult) {
System.out.println(threadId + " - Thread successfully canceled");
} else {
System.out.println(threadId + " - Thread couldn't be canceled, may have already completed");
}
} else {
System.out.println("Thread not found for id: " + threadId);
}
}
}
所以我把方法放到了启动类那边了
public Map<String, Object> sendCancelRequest(JSONObject json) {
Map<String, Object> result = new HashMap<>();
String host = json.getString("host");
int port = 4546;
String threadId = json.getString("threadId");
try (Socket socket = new Socket(host, port);
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
ScocketObject myObject = new ScocketObject(threadId);
out.writeObject(myObject);
out.flush();
result.put(Constants.STATUS, Status.SUCCESS);
} catch (IOException e) {
e.printStackTrace();
result.put(Constants.STATUS, Status.TASK_CANNCEL_FAIL);
}
result.put(Constants.DATA_LIST,"");
return result;
}