scala写入MongoDB

发布时间:2023年12月22日

1.6 写MongoDB

写MongoDB的依赖

<dependency>
 ? ?<groupId>org.apache.spark</groupId>
 ? ?<artifactId>spark-sql_2.12</artifactId>
 ? ?<version>3.1.2</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<dependency>
 ? ?<groupId>org.mongodb</groupId>
 ? ?<artifactId>casbah-core_2.12</artifactId>
 ? ?<version>3.1.1</version>
</dependency>
?
<!-- 加入Spark读写MongoDB的驱动 -->
<dependency>
 ? ?<groupId>org.mongodb.spark</groupId>
 ? ?<artifactId>mongo-spark-connector_2.12</artifactId>
 ? ?<version>2.4.3</version>
</dependency>

代码

package com.qianfeng.sparksql
?
import com.mongodb.{MongoClientURI, casbah}
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
?
/**
 * 读写MongoDB的数据
 */
//mongoDB的连接配置封装
case class MongoConfig(url:String,db:String,col:String)
//封装学生考试信息
case class Stu(classID:Int,stuName:String,age:Int,sex:String,subject:String,score:Double)
object Demo09_Mongo_RW_Data {
 ?def main(args: Array[String]): Unit = {
 ? ?val configMap = Map(
 ? ? ?"url"->"mongodb://qianfeng01:27017/mydb",
 ? ? ?"db"->"mydb",
 ? ? ?"collection"->"student"
 ?  )
 ? ?//获取SparkSQL的上下文
 ? ?val spark = SparkSession.builder()
 ? ?  .appName("sparksql")
 ? ?  .master("local[*]")
 ? ?  .getOrCreate()
?
 ? ?import spark.implicits._
 ? ?val stuDF = spark.sparkContext.textFile("/Users/liyadong/data/sparkdata/stu.txt")
 ? ?  .filter(_.length > 0)
 ? ?  .map(x => {
 ? ? ? ?val fields = x.split(" ")
 ? ? ? ?//提取字段
 ? ? ? ?Stu(fields(0).trim.toInt, fields(1).trim, fields(2).trim.toInt, fields(3).trim, fields(4).trim, fields(5).trim.toDouble)
 ? ?  }).toDF()
 ? ?//stuDF.show()
?
 ? ?//写入数据
 ? ?val config = MongoConfig(configMap.getOrElse("url",""),configMap.getOrElse("db",""),configMap.getOrElse("collection",""))
 ? ?writeDataToMongoDB(config,stuDF)
?
 ? ?//读取mongoDB的数据
 ? ?readDataFromMongoDB(config, spark)
?
 ? ?/*
 ? ?match case
 ? ? */
 ? ?//值匹配
 ? ?val a = 666
 ? ?val a1 = a match {
 ? ? ?case 666 => print("666-666")
 ? ? ?case 999 => println("999")
 ? ? ?case _ => println("null")
 ?  }
 ? ?println(a1)
?
 ? ?//元组个数匹配
 ? ?val t = (1,3,5)
 ? ?val t1 = t match {
 ? ? ?case (a, b, 3) => a + b + 3
 ? ? ?case (a, b, 5) => a + b
 ? ? ?case _ => 0
 ?  }
 ? ?println(t1)
?
 ? ?//数组个数匹配
 ? ?val l = Array(1,2)
 ? ?val l1 = l match {
 ? ? ?case Array(a) => a
 ? ? ?case Array(a, b) => a + b
 ?  }
 ? ?println(l1)
?
 ? ?"hello spark".foreach(ch=>println(ch match {
 ? ? ?case ' ' => "space"
 ? ? ?case _ => "char:" + ch
 ?  }))
?
?
 ? ?//5、关闭spark对象
 ? ?spark.stop()
  }
?
 ?/**
 ? * 数据写入Mongo的逻辑
 ? * @param config
 ? * @param stuDF
 ? */
 ?def writeDataToMongoDB(config:MongoConfig,stuDF:DataFrame): Unit ={
 ? ?/*//获取Mongo的client
 ? ?val client = MongoClient(casbah.MongoClientURI(config.url))
 ? ?//获取操作集合
 ? ?val collection = client(config.db)(config.col)
?
 ? ?//删除集合
 ? ?collection.dropCollection()*/
 ? ?//将df写入collection中
 ? ?stuDF
 ? ?  .write
 ? ?  .option("uri",config.url)
 ? ?  .option("collection",config.col)
 ? ?  .mode(SaveMode.Overwrite)
 ? ?  .format("com.mongodb.spark.sql")
 ? ?  .save()
?
 ? ?//对数据创建索引
 ? ?//collection.createIndex("name")
  }
?
 ?/**
 ? * 读取数据
 ? * @param config
 ? * @param stuDF
 ? */
 ?def readDataFromMongoDB(config:MongoConfig,spark:SparkSession): Unit ={
 ? ?val frame = spark
 ? ?  .read
 ? ?  .option("uri", config.url)
 ? ?  .option("collection", config.col)
 ? ?  .format("com.mongodb.spark.sql")
 ? ?  .load()
 ? ?//打印
 ? ?frame match {
 ? ? ?case f => f.show()
 ? ? ?case _ => ""
 ?  }
  }
}

1、环境

2、idea中不能运行scala

3、winutil.exe ?null???? ?,,hadoop解压某个目录,将wingtil.exe和hadoop.dll放到hadoop的解压目录下的bin目录中,配置环境变量放到path中。关闭idea重新打开

Config("Hadoop_home_dir","")

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

文章来源:https://blog.csdn.net/HYSliuliuliu/article/details/134983673
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。