在 Spark 中,您可以使用 write
方法将 DataFrame 中的数据保存到 MySQL 数据库。以下是一些示例 Scala 代码,展示了如何将 DataFrame 写入到 MySQL 数据库:
import org.apache.spark.sql.{SparkSession, SaveMode}
// 创建 SparkSession
val spark = SparkSession.builder
.appName("Save to MySQL")
.master("local") // 根据您的环境调整
.getOrCreate()
// 创建一个示例 DataFrame(替换为您的实际 DataFrame)
val data = Seq(
(1, "John", 25),
(2, "Alice", 30),
(3, "Bob", 28)
)
val columns = Seq("id", "name", "age")
val df = spark.createDataFrame(data).toDF(columns: _*)
// 配置连接信息
val jdbcUrl = "jdbc:mysql://your_mysql_host:3306/your_database"
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", "your_username")
connectionProperties.put("password", "your_password")
// 将 DataFrame 写入 MySQL
df.write
.mode(SaveMode.Append) // 保存模式,可以选择 Append、Overwrite、ErrorIfExists、Ignore
.jdbc(jdbcUrl, "your_table_name", connectionProperties)
// 关闭 SparkSession
spark.stop()
请根据您的实际情况替换以下内容:
your_mysql_host
:MySQL 主机地址your_database
:要写入的数据库名称your_username
:MySQL 用户名your_password
:MySQL 密码your_table_name
:要写入的表名上述代码中,SaveMode.Append
表示如果表已经存在,就追加数据;如果表不存在,则创建表。您可以根据需要选择适当的保存模式。
此外,确保 SparkSession 的 master
参数和其他配置与您的实际环境相匹配。如果您运行在集群上,将 master
参数设置为相应的主节点地址。