所有的kubernetes管理平台,都会用到TTY的功能,既通过前端直接进入到容器内部,这是一个交互式的操作,或者说是一个流式操作,简单的http协议肯定不能满足这个需求,使用websocket就能很好的满足这个需求。
用通俗的话来描述websocket, 其实就三点:
既然决定了使用websocket作为前端进入容器的方式,那么可以看看后端是如何进入容器的。
golang中,一般使用github.com/gorilla/websocket 对websocket进行封装。
package wsconnect
import (
"errors"
"github.com/gorilla/websocket"
"net/http"
"sync"
)
// http升级websocket协议的配置
var wsUpgrader = websocket.Upgrader{
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// websocket消息
type WsMessage struct {
MessageType int
Data []byte
}
// 封装websocket连接
type WsConnection struct {
wsSocket *websocket.Conn // 底层websocket
inChan chan *WsMessage // 读取队列
outChan chan *WsMessage // 发送队列
mutex sync.Mutex // 避免重复关闭管道
isClosed bool
closeChan chan byte // 关闭通知
}
// 读取协程
func (wsConn *WsConnection) wsReadLoop() {
var (
msgType int
data []byte
msg *WsMessage
err error
)
for {
// 读一个message
if msgType, data, err = wsConn.wsSocket.ReadMessage(); err != nil {
goto ERROR
}
msg = &WsMessage{
msgType,
data,
}
// 放入请求队列
select {
case wsConn.inChan <- msg:
case <-wsConn.closeChan:
goto CLOSED
}
}
ERROR:
wsConn.WsClose()
CLOSED:
}
// 发送协程
func (wsConn *WsConnection) wsWriteLoop() {
var (
msg *WsMessage
err error
)
for {
select {
// 取一个应答
case msg = <-wsConn.outChan:
// 写给websocket
if err = wsConn.wsSocket.WriteMessage(msg.MessageType, msg.Data); err != nil {
goto ERROR
}
case <-wsConn.closeChan:
goto CLOSED
}
}
ERROR:
wsConn.WsClose()
CLOSED:
}
func InitWebsocket(resp http.ResponseWriter, req *http.Request) (wsConn *WsConnection, err error) {
var (
wsSocket *websocket.Conn
)
// 应答客户端告知升级连接为websocket
if wsSocket, err = wsUpgrader.Upgrade(resp, req, nil); err != nil {
return
}
wsConn = &WsConnection{
wsSocket: wsSocket,
inChan: make(chan *WsMessage, 1000),
outChan: make(chan *WsMessage, 1000),
closeChan: make(chan byte),
isClosed: false,
}
// 读协程
go wsConn.wsReadLoop()
// 写协程
go wsConn.wsWriteLoop()
return wsConn, nil
}
// 发送消息
func (wsConn *WsConnection) WsWrite(messageType int, data []byte) (err error) {
select {
case wsConn.outChan <- &WsMessage{messageType, data}:
case <-wsConn.closeChan:
err = errors.New("websocket closed")
}
return
}
// 读取消息
func (wsConn *WsConnection) WsRead() (msg *WsMessage, err error) {
select {
case msg = <-wsConn.inChan:
return
case <-wsConn.closeChan:
err = errors.New("websocket closed")
}
return
}
// 关闭连接
func (wsConn *WsConnection) WsClose() {
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if !wsConn.isClosed {
wsConn.isClosed = true
close(wsConn.closeChan)
}
}
上述代码就是一个简单的websocket的封装,来看看这段代码做了哪些事:
上述代码是对websocket的封装,这段代码仅仅是用来获取前端传来的数据,并不会对kubernetes进行任何操作,好在kubernetes的标准库"k8s.io/client-go/tools/remotecommand"提供了一个解决思路,remotecommand提供了连接到容器的方法:
if executor, err = remotecommand.NewSPDYExecutor(restConf, "POST", sshReq.URL()); err != nil {
goto END
}
然后使用Stream方法,将输入输出以流的方式,连接到容器
// 配置与容器之间的数据流处理回调
handler = &streamHandler{wsConn: wsConn, resizeEvent: make(chan remotecommand.TerminalSize)}
if err = executor.Stream(remotecommand.StreamOptions{
Stdin: handler,
Stdout: handler,
Stderr: handler,
TerminalSizeQueue: handler,
Tty: true,
}); err != nil {
goto END
}
return
当然,在实际的执行命令之前,需要提前做一些预处理,比如验证权限,初始化客户端等等,简单的描述一下后端的逻辑:
前端由于需要模拟一个terminal的窗口,这块可以用到大名鼎鼎的xterm,使用xterm可以在前端模拟出一个完整的terminal,包括颜色 ,字体 ,窗口大小等等,都是可以可配置的。
由于我的前端使用的是ts+vue3的框架进行编写的,所以仅需要单独写一个页面即可。
<template>
<div class="container">
<Breadcrumb
:items="[
{
path: '../workload/listpods',
label: $t('menu.dashboard.workload'),
},
{ path: '', label: $t('menu.dashboard.workload.terminal.get') },
]"
/>
<!-- 基础信息 -->
<div
:style="{
width: '100%',
}"
>
<a-card
class="general-card"
:title="$t('menu.dashboard.workload.terminal.get')"
>
<a-row style="margin-bottom: 16px">
<a-col :span="4">
<a-space size="mini">
<a-tag size="large">命名空间:</a-tag>
<p>{{ route.query.namespace }}</p>
</a-space>
</a-col>
<a-col :span="8" :offset="1">
<div>
<a-space size="mini">
<a-tag size="large">Pod:</a-tag>
<p>{{ route.query.podname }}</p>
</a-space>
</div>
</a-col>
<a-col :span="4" :offset="1">
<div>
<a-space size="mini">
<a-tag size="large">Container:</a-tag>
<p>{{ route.query.container }}</p>
</a-space>
</div>
</a-col>
<!-- 选择bash or shell -->
<a-col :span="4" :offset="1">
<div>
<a-space size="mini">
<h4>Bash:</h4>
<a-select :style="{ width: '100px' }" v-model="currentBash">
<a-option>bash</a-option>
<a-option>sh</a-option>
</a-select>
</a-space>
</div>
</a-col>
</a-row>
<div ref="terminal"></div>
</a-card>
</div>
</div>
</template>
<script lang="ts" setup>
import { ref, onMounted, onBeforeUnmount, watch } from "vue";
import useLoading from "@/hooks/loading";
import { debounce } from "lodash";
import { Terminal } from "xterm";
import { FitAddon } from "xterm-addon-fit";
import { useRoute } from "vue-router";
import "xterm/css/xterm.css";
import { LabelDesc } from "@/api/common";
import { getToken } from "@/utils/auth";
const { loading } = useLoading(true);
const route = useRoute();
const descData = ref<LabelDesc[]>([
{ label: "命名空间", value: route.query.namespace as string },
{ label: "POD", value: route.query.podname as string },
{ label: "Container", value: route.query.container as string },
]);
const shrc = ref<string>("b");
const currentBash = ref<string>("bash");
// websocket客户端初始化相关
// 打开terminal
const OpenTerminal = () => {
loading.value = false;
};
// 关闭terminal
const CloseTerminal = () => {
console.log("onclose");
};
// 处理消息
const OnMessage = (event: any) => {
term.value.write(event.data);
};
// 处理terminal错误
const OnError = () => {
console.log("onerror");
};
const terminalSocket = ref();
// 判断连接是否打开
const isWsOpen = () => {
const readyState = terminalSocket.value && terminalSocket.value.readyState;
return readyState === 1;
};
// 创建WS
const createWS = () => {
const token = getToken() as string;
if (currentBash.value === "bash") {
shrc.value = "b"
} else if (currentBash.value === "sh") {
shrc.value = "s"
}
const wsUrl = `wss://${window.location.host}/kubemgr/api/v1/ws/${route.params.clusteruuid}/${route.query.namespace}/${route.query.podname}/${route.query.container}/${shrc.value}/ssh?clusterinfo=${token}`;
terminalSocket.value = new WebSocket(wsUrl);
terminalSocket.value.onopen = OpenTerminal; // WebSocket 连接已建立
terminalSocket.value.onmessage = OnMessage; // 收到服务器消息
terminalSocket.value.onclose = CloseTerminal; // WebSocket 连接已关闭
terminalSocket.value.onerror = OnError; // WebSocket 连接出错
};
// 初始化WS
const initWS = () => {
if (!terminalSocket.value) {
createWS();
}
if (terminalSocket.value && terminalSocket.value.readyState >= 1) {
terminalSocket.value.close();
createWS();
}
};
// terminal初始化相关
const term = ref();
const terminal = ref();
const fitAddon = new FitAddon();
// 尺寸同步 发送给后端,调整后端终端大小,和前端保持一致,不然前端只是范围变大了,命令还是会换行
const resizeRemoteTerminal = () => {
if (isWsOpen()) {
const msg = {
type: "resize",
rows: term.value.rows,
cols: term.value.cols,
};
terminalSocket.value.send(JSON.stringify(msg));
}
};
// 终端输入绑定事件
const termData = () => {
// 输入与粘贴的情况,onData不能重复绑定,不然会发送多次
term.value.onData((data: any) => {
if (isWsOpen()) {
// 写给服务端, 由服务端发给container
const msg = { type: "input", input: data };
terminalSocket.value.send(JSON.stringify(msg));
}
});
// 终端尺寸变化触发
term.value.onResize(() => {
resizeRemoteTerminal();
});
};
const initTerm = () => {
term.value = new Terminal({
lineHeight: 1.2,
fontSize: 12,
fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",
theme: {
background: "#181d28",
},
// 光标闪烁
cursorBlink: true,
cursorStyle: "underline",
scrollback: 100,
tabStopWidth: 4,
});
term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据
term.value.loadAddon(fitAddon); // 自适应尺寸
// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作
setTimeout(() => {
fitAddon.fit();
}, 1000);
termData(); // Terminal 事件挂载
};
const resetTerm = () => {
term.value.reset()
terminal.value.innerHTML = '';
term.value = new Terminal({
lineHeight: 1.2,
fontSize: 12,
fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",
theme: {
background: "#181d28",
},
// 光标闪烁
cursorBlink: true,
cursorStyle: "underline",
scrollback: 100,
tabStopWidth: 4,
});
term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据
term.value.loadAddon(fitAddon); // 自适应尺寸
// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作
setTimeout(() => {
fitAddon.fit();
}, 1000);
termData(); // Terminal 事件挂载
};
// 窗口大小适应相关
// 适应浏览器尺寸变化
const fitTerm = () => {
fitAddon.fit();
};
const onResize = debounce(() => fitTerm(), 500);
const onTerminalResize = () => {
window.addEventListener("resize", onResize);
};
const removeResizeListener = () => {
window.removeEventListener("resize", onResize);
};
onMounted(() => {
loading.value = true;
initWS();
initTerm();
onTerminalResize();
});
onBeforeUnmount(() => {
removeResizeListener();
if (terminalSocket.value) {
terminalSocket.value.close();
}
});
watch(
() => currentBash.value, // 要监视的数据
() => {
// 回调函数
loading.value = true;
initWS();
resetTerm();
// initTerm();
onTerminalResize();
},
{
// immediate: true, // 立即执行回调
deep: true, // 深层监视
}
);
</script>
<script lang="ts">
export default {
name: "GetTerminal",
};
</script>
<style lang="scss" scoped>
.terminal {
width: 90%;
// height: calc(100% - 62px);
// height: 100%;
margin-bottom: 16px;
}
</style>
最终的效果就类似这种
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection upgrade;
proxy_set_header Host $host;
proxy_pass http://127.0.0.1:8082;