delta.io是三大数据湖之一,Iceberg 和hudi. 国内人用的比较多,delta国外的大厂用的比较多,主要来源与databrack . 像苹果,adobe,阿里等公司用的是delta.io,相对来说比较成熟一些。通过idea的spark 操作delta.
idea maven 的pom.xml
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.0</version> <!-- 根据实际情况选择版本号 -->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle -->
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 -->
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-core -->
<!-- https://mvnrepository.com/artifact/io.delta/delta-spark -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle -->
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.375</version>
</dependency> -->
实现代码。
delta 存储用的是minio,没有用hadoop
package spark.delta
import org.apache.spark.sql.SparkSession
object delta {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("test")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
.config("spark.hadoop.spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key", "minioadmin")
.config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
.getOrCreate()
//val tesflile="s3a://datalake/aa.txt"
// val bucketName = "datalake"
//val minioPath = "s3a://" + bucketName + "/common/outputData"
// val df = spark.read.text(tesflile)
//println("总共单词数量"+df.count())
val data = spark.range(0, 5)
data.write.format("delta").save("s3a://bat/zhangshan")
//spark.range(500).write.format("delta").save("/tmp/delta-table")
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
}
}