从 WasmEdge 运行环境读写 Rust Wasm 应用的时序数据

发布时间:2023年12月27日

WebAssembly (Wasm) 正在成为一个广受欢迎的编译目标,帮助开发者构建可迁移平台的应用。最近 Greptime 和 WasmEdge 协作,支持了在 WasmEdge 平台上的 Wasm 应用通过 MySQL 协议读写 GreptimeDB 中的时序数据。

什么是 WebAssembly

WebAssembly 是一种新的指令格式,同时具备了跨平台和接近原生机器代码的执行速度。** 通过将 C/C++ 或 Rust 代码编译成 WebAssembly ,可以在浏览器中提升程序的性能。而在浏览器外的其他运行环境,尤其是 CDN 或 IoT 的边缘端,我们也可以利用 WebAssembly 实现沙盒、动态加载的插件机制等高级的功能。

什么是 WasmEdge

WasmEdge 是 CNCF 的沙箱项目,提供上文提到的沙盒能力,允许开发者在 WebAssembly 标准的基础上,进一步扩展其能访问的资源和接口。例如,WasmEdge 为 Wasm 提供了额外的 TLS、网络能力和 AI 能力,大大丰富了使用场景。

WasmEdge GitHub 地址:
https://github.com/WasmEdge/WasmEdge

安装 GreptimeDB 和 WasmEdge

如果你已经安装了 GreptimeDB ,可以跳过这个步骤。

下载 GreptimeDB 并运行:

curl -L https://github.com/GreptimeTeam/greptimedb/raw/develop/scripts/install.sh | sh
./greptime standalone start


安装 WasmEdge:

curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash -s

编写 GreptimeDB 的 WASM 应用

在 WasmEdge 中,我们可以使用 MySQL 协议,让 Rust 语言编写的应用程序连接到 GreptimeDB。

首先通过 `cargo new`?创建一个新的 Rust 项目,我们的编译目标将是 `wasm32-wasi`,可以在项目根目录下创建 `.cargo/config.toml` 文件,指定默认编译目标,之后就无需在每次 `cargo build` 命令后专门指定 `--target` 了。

# .cargo/config.toml
[build]
target = "wasm32-wasi"

编辑 `Cargo.toml` 增加依赖。`mysql_async` 的应用需要 `tokio` 运行时,WasmEdge 维护了这两个库的修改版本,使他们能够编译成 WebAssembly 代码,并且运行到 WasmEdge 环境中。

[package]
name = "greptimedb"
version = "0.1.0"
edition = "2021"

[dependencies]
mysql_async_wasi = "0.31"
time = "0.3"
tokio_wasi = { version = "1", features = [ "io-util", "fs", "net", "time", "rt", "macros"] }

进一步编辑 `src/main.rs` 文件,加入数据库访问的逻辑。这段代码将演示:

1. 通过环境变量读取数据库地址,并创建连接池;
2. 执行 SQL 语句创建数据表;
3. 插入数据;
4. 查询数据。

定义数据结构:

#[derive(Debug)]
struct CpuMetric {
? ? hostname: String,
? ? environment: String,
? ? usage_user: f64,
? ? usage_system: f64,
? ? usage_idle: f64,
? ? ts: i64,
}

impl CpuMetric {
? ? fn new(
? ? ? ? hostname: String,
? ? ? ? environment: String,
? ? ? ? usage_user: f64,
? ? ? ? usage_system: f64,
? ? ? ? usage_idle: f64,
? ? ? ? ts: i64,
? ? ) -> Self {
? ? ? ? Self {
? ? ? ? ? ? hostname,
? ? ? ? ? ? environment,
? ? ? ? ? ? usage_user,
? ? ? ? ? ? usage_system,
? ? ? ? ? ? usage_idle,
? ? ? ? ? ? ts,
? ? ? ? }
? ? }
}

初始化数据库连接池:

use mysql_async::{
? ? prelude::*, Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, Result,
};
use time::PrimitiveDateTime;

fn get_url() -> String {
? ? if let Ok(url) = std::env::var("DATABASE_URL") {
? ? ? ? let opts = Opts::from_url(&url).expect("DATABASE_URL invalid");
? ? ? ? if opts
? ? ? ? ? ? .db_name()
? ? ? ? ? ? .expect("a database name is required")
? ? ? ? ? ? .is_empty()
? ? ? ? {
? ? ? ? ? ? panic!("database name is empty");
? ? ? ? }
? ? ? ? url
? ? } else {
? ? ? ? "mysql://root:pass@127.0.0.1:3306/mysql".into()
? ? }
}


#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
? ? // Alternative: The "easy" way with a default connection pool
? ? // let pool = Pool::new(Opts::from_url(&*get_url()).unwrap());
? ? // let mut conn = pool.get_conn().await.unwrap();

? ? // Below we create a customized connection pool
? ? let opts = Opts::from_url(&*get_url()).unwrap();
? ? let builder = OptsBuilder::from_opts(opts);
? ? // The connection pool will have a min of 1 and max of 2 connections.
? ? let constraints = PoolConstraints::new(1, 2).unwrap();
? ? let pool_opts = PoolOpts::default().with_constraints(constraints);

? ? let pool = Pool::new(builder.pool_opts(pool_opts));
? ? let mut conn = pool.get_conn().await.unwrap();
? ??
? ??
? ??
? ? Ok(())
}

创建数据表:

? ? // Create table if not exists
? ? r"CREATE TABLE IF NOT EXISTS wasmedge_example_cpu_metrics (
? ? hostname STRING,
? ? environment STRING,
? ? usage_user DOUBLE,
? ? usage_system DOUBLE,
? ? usage_idle DOUBLE,
? ? ts TIMESTAMP,
? ? TIME INDEX(ts),
? ? PRIMARY KEY(hostname, environment)
);"
? ? .ignore(&mut conn)
? ? .await?;
?

插入数据:

? ? let metrics = vec![
? ? ? ? CpuMetric::new(
? ? ? ? ? ? "host0".into(),
? ? ? ? ? ? "test".into(),
? ? ? ? ? ? 32f64,
? ? ? ? ? ? 3f64,
? ? ? ? ? ? 4f64,
? ? ? ? ? ? 1680307200050,
? ? ? ? ),
? ? ? ? CpuMetric::new(
? ? ? ? ? ? "host1".into(),
? ? ? ? ? ? "test".into(),
? ? ? ? ? ? 29f64,
? ? ? ? ? ? 32f64,
? ? ? ? ? ? 50f64,
? ? ? ? ? ? 1680307200050,
? ? ? ? ),
? ? ? ? CpuMetric::new(
? ? ? ? ? ? "host0".into(),
? ? ? ? ? ? "test".into(),
? ? ? ? ? ? 32f64,
? ? ? ? ? ? 3f64,
? ? ? ? ? ? 4f64,
? ? ? ? ? ? 1680307260050,
? ? ? ? ),
? ? ? ? CpuMetric::new(
? ? ? ? ? ? "host1".into(),
? ? ? ? ? ? "test".into(),
? ? ? ? ? ? 29f64,
? ? ? ? ? ? 32f64,
? ? ? ? ? ? 50f64,
? ? ? ? ? ? 1680307260050,
? ? ? ? ),
? ? ? ? CpuMetric::new(
? ? ? ? ? ? "host0".into(),
? ? ? ? ? ? "test".into(),
? ? ? ? ? ? 32f64,
? ? ? ? ? ? 3f64,
? ? ? ? ? ? 4f64,
? ? ? ? ? ? 1680307320050,
? ? ? ? ),
? ? ? ? CpuMetric::new(
? ? ? ? ? ? "host1".into(),
? ? ? ? ? ? "test".into(),
? ? ? ? ? ? 29f64,
? ? ? ? ? ? 32f64,
? ? ? ? ? ? 50f64,
? ? ? ? ? ? 1680307320050,
? ? ? ? ),
? ? ];

? ? r"INSERT INTO wasmedge_example_cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
? ? ? VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
? ? ? ? .with(metrics.iter().map(|metric| {
? ? ? ? ? ? params! {
? ? ? ? ? ? ? ? "hostname" => &metric.hostname,
? ? ? ? ? ? ? ? "environment" => &metric.environment,
? ? ? ? ? ? ? ? "usage_user" => metric.usage_user,
? ? ? ? ? ? ? ? "usage_system" => metric.usage_system,
? ? ? ? ? ? ? ? "usage_idle" => metric.usage_idle,
? ? ? ? ? ? ? ? "ts" => metric.ts,
? ? ? ? ? ? }
? ? ? ? }))
? ? ? ? .batch(&mut conn)
? ? ? ? .await?;

查询数据:

? ? let loaded_metrics = "SELECT * FROM wasmedge_example_cpu_metrics"
? ? ? ? .with(())
? ? ? ? .map(
? ? ? ? ? ? &mut conn,
? ? ? ? ? ? |(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
? ? ? ? ? ? ? ? String,
? ? ? ? ? ? ? ? String,
? ? ? ? ? ? ? ? f64,
? ? ? ? ? ? ? ? f64,
? ? ? ? ? ? ? ? f64,
? ? ? ? ? ? ? ? PrimitiveDateTime,
? ? ? ? ? ? )| {
? ? ? ? ? ? ? ? let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
? ? ? ? ? ? ? ? CpuMetric::new(
? ? ? ? ? ? ? ? ? ? hostname,
? ? ? ? ? ? ? ? ? ? environment,
? ? ? ? ? ? ? ? ? ? usage_user,
? ? ? ? ? ? ? ? ? ? usage_system,
? ? ? ? ? ? ? ? ? ? usage_idle,
? ? ? ? ? ? ? ? ? ? ts,
? ? ? ? ? ? ? ? )
? ? ? ? ? ? },
? ? ? ? )
? ? ? ? .await?;
? ? println!("{:?}", loaded_metrics);

WasmEdge 团队提供的 `tokio` 和 `mysql_async` 库与原始版本编程接口完全一致,因此可以无缝地将普通 Rust 应用切换到 WebAssembly 平台上。

编译这个项目,我们可以获得 greptimedb.wasm 文件:

cargo build
ls -lh target/wasm32-wasi/debug/greptimedb.wasm

通过 WasmEdge 运行我们的程序:

wasmedge --env "DATABASE_URL=mysql://localhost:4002/public" target/wasm32-wasi/debug/greptimedb.wasm

上面这段示例程序已经纳入了 WasmEdge 的数据库使用示例,你可以在 GitHub 仓库找到完整的代码:
https://github.com/WasmEdge/wasmedge-db-examples/tree/main/greptimedb。

总结

WasmEdge 为 WebAssembly 应用提供了更多的扩展能力。如果你也将应用部署在 WebAssembly 环境里,未来我们还可以使用 OpenTelemetry SDK 采集指标数据直接存储到 GreptimeDB 。现在就下载 GreptimeDB 或开通 GreptimeCloud 实例运行上面的例子吧。

文章来源:https://blog.csdn.net/beary_tang/article/details/135244244
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。