图片无法加载请跳转CSDN:Spark SQL 概述-CSDN博客
Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 DataFrame 和 DataSet API 进行数据操作。
一、Spark SQL 架构 Spark SQL 的架构主要由以下几个组件组成:
SparkSession :Spark 应用的统一入口点,用于创建 DataFrame、DataSet 和执行 SQL 查询。
Catalyst 优化器 :Spark SQL 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
DataFrame 和 DataSet API :提供面向对象的编程接口,支持丰富的数据操作方法。
数据源接口 :支持多种数据源,如 HDFS、S3、HBase、Cassandra、Hive 等。
执行引擎 :将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。
二、Spark SQL 特点
统一数据访问接口 :支持多种数据源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查询接口。
DataFrame 和 Dataset API :提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
Catalyst 优化器 :自动将用户的查询转换为高效的执行计划,提升查询性能。
与 Hive 的集成 :无缝集成 Hive,能够直接访问现存的 Hive 数据,并使用 Hive 的 UDF 和 UDAF。
高性能 :通过 Catalyst 优化器和 Tungsten 执行引擎,实现高效的查询性能和内存管理。
多种操作方式 :支持 SQL 和 API 编程两种操作方式,灵活性高。
外部工具接口 :提供 JDBC/ODBC 接口供第三方工具借助 Spark 进行数据处理。
高级接口 :提供了更高层级的接口,方便地处理数据。
三、Spark SQL 运行原理
查询解析(Query Parsing) :将 SQL 查询解析成抽象语法树(AST)。
逻辑计划生成(Logical Plan Generation) :将 AST 转换为未优化的逻辑计划。
逻辑计划优化(Logical Plan Optimization) :使用 Catalyst 优化器对逻辑计划进行一系列规则优化。
物理计划生成(Physical Plan Generation) :将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。
执行(Execution) :将物理计划转换为 RDD,并在集群上并行执行。
四、Spark SQL API 相关概述 SparkContext :SparkContext 是 Spark 应用程序的主入口点,负责连接到 Spark 集群,管理资源和任务调度。在 Spark 2.0 之后,推荐使用 SparkSession 取代 SparkContext。
SQLContext :SQLContext 是 Spark SQL 的编程入口点,允许用户通过 SQL 查询或 DataFrame API 进行数据处理。它提供了基本的 Spark SQL 功能。
HiveContext :HiveContext 是 SQLContext 的子集,增加了对 Hive 的集成支持,可以直接访问 Hive 中的数据和元数据,使用 Hive 的 UDF 和 UDAF。
SparkSession :SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了统一的编程接口。SparkSession 是 Spark SQL 的建议入口点,支持使用 DataFrame 和 Dataset API 进行数据处理。
创建 SparkContext 和 SparkSession 的注意事项 :如果同时需要创建 SparkContext 和 SparkSession,必须先创建 SparkContext,再创建 SparkSession。如果先创建 SparkSession,再创建 SparkContext,会导致异常,因为在同一个 JVM 中只能运行一个 SparkContext。
五、Spark SQL 依赖 1 2 3 4 5 6 7 8 9 <properties > <spark.version > 3.1.2</spark.version > <spark.scala.version > 2.12</spark.scala.version > </properties > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_${spark.scala.version}</artifactId > <version > ${spark.version}</version > </dependency >
六、Spark SQL 数据集
在 Spark SQL 中,数据集主要分为以下几种类型:DataFrame 和 Dataset。它们是处理和操作结构化和半结构化数据的核心抽象。
1、DataFrame
Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:
类似于二维表格 :DataFrame 类似于传统的关系数据库中的二维表格。
Schema(数据结构信息) :在 RDD 的基础上加入了 Schema,描述数据结构的信息。
支持嵌套数据类型 :DataFrame 的 Schema 支持嵌套的数据类型,如 struct
、map
和 array
。
丰富的 SQL 操作 API :提供更多类似 SQL 操作的 API,便于进行数据查询和操作。
2、Dataset
Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:
强类型 :Spark 1.6中引入的一个更通用的数据集合,Dataset 是强类型的,提供类型安全的操作。
RDD + Schema :可以认为 Dataset 是 RDD 和 Schema 的结合,既有 RDD 的分布式计算能力,又有 Schema 描述数据结构的信息。
适用于特定领域对象 :可以存储和操作特定领域对象的强类型集合。
并行操作 :可以使用函数或者相关操作并行地进行转换和操作。
3、DataFrame 和 Dataset 的关系
DataFrame 是特殊的 Dataset :DataFrame 是 Dataset 的一个特例,即 DataFrame = Dataset[Row]
。
数据抽象和操作方式的统一 :DataFrame 和 Dataset 统一了 Spark SQL 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。
七、Spark Sql 基本用法 1、Scala 创建 SparkSession 对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.apache.spark.sql.SparkSession object SparkSqlContext { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setMaster("local[4]" ) .setAppName("spark sql" ) val spark: SparkSession = SparkSession .builder() .config(conf) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ spark.stop() } }
2、DataFrame 和 Dataset 的创建方式 1、从集合创建
1 2 3 4 5 6 7 case class Person (name: String , age: Int ) val data1 = Seq (Person ("Alice" , 25 ), Person ("Bob" , 30 )) val ds: Dataset [Person ] = spark.createDataset(data) val data2 = Seq (("Alice" , 25 ), ("Bob" , 30 ))val df: DataFrame = data.toDF("name" , "age" )
1、从文件系统读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 val schema = StructType (Seq ( StructField ("name" , StringType , nullable = false ), StructField ("age" , IntegerType , nullable = false ) ))val dsJson: Dataset [Person ] = spark.read.json("/path/to/json/file" ).as[Person ]val dfCsv: DataFrame = spark.read .schema(schema) .option("header" , "true" ) .csv("/path/to/csv/file" )
3、从关系型数据库读取
1 2 3 4 5 6 7 8 val url = "jdbc:mysql://localhost:3306/database" val properties = new java.util.Properties () properties.setProperty("user" , "username" ) properties.setProperty("password" , "password" )val dsDb: Dataset [Person ] = spark.read.jdbc(url, "table" , properties).as[Person ]val dfDb: DataFrame = spark.read.jdbc(url, "table" , properties)
4、从非结构化数据源读取
1 2 3 val dsParquet: Dataset [Person ] = spark.read.parquet("/path/to/parquet/file" ).as[Person ]val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file" )
5、手动创建 Dataset
1 2 3 4 5 6 7 8 9 10 11 12 13 import org.apache.spark.sql.types._val schema = StructType (Seq ( StructField ("name" , StringType , nullable = false ), StructField ("age" , IntegerType , nullable = false ) ))val data = Seq (Row ("Alice" , 25 ), Row ("Bob" , 30 ))val dsManual: Dataset [Person ] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person ]val dfManual: DataFrame = spark.createDataFrame( spark.sparkContext.parallelize(data), schema )
3、DataFrame API 语法示例一
模拟数据(1000条):
1 2 3 4 5 6 id,name,gender,age,city 1,邵睿,男,12,上海市 2,林子异,男,48,广州市 3,孟秀英,女,46,上海市 4,金嘉伦,男,8,北京市 ...
需求:哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import spark.implicits._val schema = StructType (Seq ( StructField ("id" , LongType ), StructField ("name" , StringType ), StructField ("gender" , StringType ), StructField ("age" , IntegerType ), StructField ("city" , StringType ), ))val WindowSpec : WindowSpec = Window .partitionBy($"gender" ) .orderBy($"avg_age" .desc) spark.read .option("header" , "true" ) .csv("D:\\projects\\sparkSql\\people.csv" ) .select($"id" , $"name" , $"age" , $"city" , $"gender" ) .groupBy($"city" , $"gender" ) .agg( count($"id" ).as("count" ), round(avg($"age" ), 2 ).as("avg_age" ) ) .where($"count" .gt(50 )) .orderBy($"avg_age" .desc) .select($"city" , $"gender" , $"avg_age" , dense_rank().over(Window .partitionBy($"gender" ).orderBy($"avg_age" .desc)).as("gender_avg_age_rank" )) .show()
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 +------+------+-------+-------------------+ | city|gender|avg_age|gender_avg_age_rank| +------+------+-------+-------------------+ |北京市| 男| 41.05| 1| | 东莞| 男| 42.81| 2| |上海市| 男| 43.92| 3| |成都市| 男| 45.89| 4| | 中山| 男| 47.08| 5| |广州市| 男| 47.47| 6| | 深圳| 男| 48.36| 7| |上海市| 女| 46.02| 1| | 中山| 女| 49.55| 2| +------+------+-------+-------------------+
语法示例二:视图,sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val dfPeople: DataFrame = spark.read .option("header" , "true" ) .csv("D:\\projects\\sparkSql\\people.csv" ) dfPeople.createOrReplaceTempView("people_view" ) spark.sql("SELECT name, age FROM people_view" ).show() spark.sql( """ |select * from people_view |where gender = '男' |""" .stripMargin ).show()
语法示例三:join
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 case class Student (name: String , classId: Int )case class Class (classId: Int , className: String )val frmStu = spark.createDataFrame( Seq ( Student ("张三" , 1 ), Student ("李四" , 1 ), Student ("王五" , 2 ), Student ("赵六" , 2 ), Student ("李明" , 2 ), Student ("王刚" , 4 ), Student ("王朋" , 5 ), ) )val frmClass = spark.createDataFrame( Seq ( Class (1 , "name1" ), Class (2 , "name2" ), Class (3 , "name3" ), Class (4 , "name4" ) ) )
left
左连接,rignt
右连接, full
全外连接,anti
左差集,semi
左交集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 frmStu.as("S" ) .join(frmClass.as("C" ), $"S.classId" === $"C.classId" ) .show() frmStu .join(frmClass, Seq ("classId" ), "left" ) .show() frmStu .join(frmClass, Seq ("classId" ), "anti" ) .show() frmStu .join(frmClass, Seq ("classId" ), "semi" ) .show()
结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 别名 + inner 内连接 +----+-------+-------+---------+ |name|classId|classId|className| +----+-------+-------+---------+ |张三| 1| 1| name1| |李四| 1| 1| name1| |王五| 2| 2| name2| |赵六| 2| 2| name2| |李明| 2| 2| name2||王刚| 4| 4| name4| +----+-------+-------+---------+ 使用左外连接将df和frmClass根据classId合并 +-------+----+---------+ |classId|name|className| +-------+----+---------+ | 1|张三| name1| | 1|李四| name1| | 2|王五| name2| | 2|赵六| name2| | 2|李明| name2| | 4|王刚| name4|| 5|王朋| null| +-------+----+---------+ 左差集 +-------+----+ |classId|name| +-------+----+ | 5|王朋| +-------+----+ 左交集 +-------+----+ |classId|name| +-------+----+ | 1|张三| | 1|李四| | 2|王五| | 2|赵六| | 2|李明|| 4|王刚| +-------+----+