行业报告 AI展会 数据标注 标注供求
数据标注数据集
主页 > 数据挖掘 > 正文

Spark SQL | Spark,从入门到精通

发家史

熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。

 

 

Spark SQL

Spark SQL 提供了多种接口:

纯 Sql 文本;
dataset/dataframe api。

当然,相应的,也会有各种客户端:

sql 文本,可以用 thriftserver/spark-sql;
编码,Dataframe/dataset/sql。

Dataframe/Dataset API 简介

Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。

可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:

 

 

Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:

type DataFrame = Dataset[Row]

 

 

所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。

基本操作

val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)
df.show()
import spark.implicits._
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
spark.stop()

分区分桶 排序

分桶排序保存hive表
df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)
分区以parquet输出到指定目录
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
分区分桶保存到hive表
df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

cube rullup pivot

cube
sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()
rull up
sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()
pivot 只能跟在groupby之后
sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()

SQL 编程

Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:

1. spark 代码
2. spark-sql的shell
3. thriftserver

支持 Spark SQL 自身的语法,同时也兼容 HSQL。

1. 编码

要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。

SQLContext

new SQLContext(SparkContext)

HiveContext

new HiveContext(spark.sparkContext)

SparkSession

不使用 hive 元数据:

val spark = SparkSession.builder()
.config(sparkConf) .getOrCreate()

使用 hive 元数据:

val spark = SparkSession.builder()
.config(sparkConf) .enableHiveSupport().getOrCreate()

使用

val df =spark.read.json("examples/src/main/resources/people.json")
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people").show()

2. spark-sql 脚本

spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用

bin/spark-sql –help 查看配置参数。

需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试

show tables;
select count(*) from student;

3. thriftserver

thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。

安装部署

/1 开启 hive 的 metastore

bin/hive --service metastore

/2 将配置文件复制到spark/conf/目录下

/3 thriftserver

sbin/start-thriftserver.sh --masteryarn --deploy-mode client

对于 yarn 只支持 client 模式。

/4 启动 bin/beeline

/5 连接到 thriftserver

!connect jdbc:hive2://localhost:10001

用户自定义函数

1. UDF

定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:

val len = udf{(str:String) => str.length}
spark.udf.register("len",len)
val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.sql("select len(name) from employees").show()

2. UserDefinedAggregateFunction

定义一个 UDAF

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverageUDAF extends UserDefinedAggregateFunction {
//Data types of input arguments of this aggregate function
definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//Data types of values in the aggregation buffer
defbufferSchema:StructType = {
StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)
}
//The data type of the returned value
defdataType:DataType = DoubleType
//Whether this function always returns the same output on the identical input
defdeterministic: Boolean = true
//Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to
// standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides
// the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still
// immutable.
definitialize(buffer:MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//Updates the given aggregation buffer `buffer` with new input data from `input`
defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0)+ input.getLong(0)
buffer(1) = buffer.getLong(1)+ 1
}
}
// Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`
defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={
buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)
}
//Calculates the final result
defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)
}

使用 UDAF

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.udf.register("myAverage", MyAverageUDAF)
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

3. Aggregator

定义一个 Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverageAggregator extends Aggregator[Employee, Average, Double] {

// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

使用

spark.udf.register("myAverage2", MyAverageAggregator)
import spark.implicits._
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]
ds.show()
val averageSalary = MyAverageAggregator.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

数据源

1. 通用的 laod/save 函数

可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

默认的是 parquet,可以通过 spark.sql.sources.default,修改默认配置。

2. Parquet 文件

val parquetFileDF =spark.read.parquet("people.parquet")
peopleDF.write.parquet("people.parquet")

3. ORC 文件

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.write.mode("append").orc("/opt/outputorc/")
spark.read.orc("/opt/outputorc/*").show(1)

 

4. JSON

ds.write.mode("overwrite").json("/opt/outputjson/")
spark.read.json("/opt/outputjson/*").show()

5. Hive 表

spark 1.6 及以前的版本使用 hive 表需要 hivecontext。Spark2 开始只需要创建 sparksession 增加 enableHiveSupport()即可。

val spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()

spark.sql("select count(*) from student").show()

6. JDBC

写入 mysql

wcdf.repartition(1).write.mode("append").option("user", "root")
.option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

从 mysql 里读

val fromMysql = spark.read.option("user", "root")
.option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

7. 自定义数据源

自定义 source 比较简单,首先我们要看看 source 加载的方式。指定的目录下,定义一个 DefaultSource 类,在类里面实现自定义 source,就可以实现我们的目标。

import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}

class DefaultSource extends DataSourceV2 with ReadSupport {

def createReader(options: DataSourceOptions) = new SimpleDataSourceReader()
}

import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class SimpleDataSourceReader extends DataSourceReader {

def readSchema() = StructType(Array(StructField("value", StringType)))

def createDataReaderFactories = {
val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]
factoryList.add(new SimpleDataSourceReaderFactory())
factoryList
}
}

import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}

class SimpleDataSourceReaderFactory extends
DataReaderFactory[Row] with DataReader[Row] {
def createDataReader = new SimpleDataSourceReaderFactory()
val values = Array("1", "2", "3", "4", "5")

var index = 0

def next = index < values.length

def get = {
val row = Row(values(index))
index = index + 1
row
}

def close() = Unit
}

使用

val simpleDf = spark.read
.format("bigdata.spark.SparkSQL.DataSources")
.load()

simpleDf.show()

优化器及执行计划

1. 流程简介

 

 

总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。

简单化成四个部分:

/1 analysis

Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。

/2 logical optimization

常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。

/3 physical planning

eg:SortExec 。

/4 Codegen

codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。

2. 自定义优化器

/1 实现

继承 Rule[LogicalPlan]

object MultiplyOptimizationRule extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {

case Multiply(left,right) if right.isInstanceOf[Literal] &&

right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>

println("=========> optimization of one applied")

left

}

}

spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)

val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")

println("after optimization")

/2 注册

spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)

/3 使用

selectExpr("amountPaid* 1")

 

 

3. 自定义执行计划

/1 物理计划

继承 SparkLan 实现 doExecute 方法。

/2 逻辑计划

继承 SparkStrategy 实现 apply。

case class FastOperator(output: Seq[Attribute],child:SparkPlan) extends SparkPlan {

override def children: Seq[SparkPlan] = Nil

override protected def doExecute(): RDD[InternalRow] = {
val row = org.apache.spark.sql.Row("hi",12L)
val unsafeRow = toUnsafeRow(row, Array(org.apache.spark.sql.types.StringType,org.apache.spark.sql.types.LongType))
sparkContext.parallelize(Seq(unsafeRow),1)
}

def toUnsafeRow(row: org.apache.spark.sql.Row, schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
val converter = unsafeRowConverter(schema)
converter(row)
}

def unsafeRowConverter(schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.Row => org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
val converter = org.apache.spark.sql.catalyst.expressions.UnsafeProjection.create(schema)
(row: org.apache.spark.sql.Row) => {
converter(org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[org.apache.spark.sql.catalyst.InternalRow])
}
}
}
case object NeverPlanned extends LeafNode {
override def output: Seq[Attribute] = Nil
}

object TestStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] =
plan match {
case Project(pblist, child) =>
println("mt fastOperator ------------>")
FastOperator(pblist.map(_.toAttribute),planLater(child)) :: Nil
case Union(children) =>
println("mt union ========>")
UnionExec(children.map(planLater)) :: Nil
case LocalRelation(output, data, _) =>
LocalTableScanExec(output, data):: Nil
case _ => Nil
}
}

/3 注册到 Spark 执行策略

spark.experimental.extraStrategies =Seq(countStrategy)

/4 使用

spark.sql("select count(*) fromtest")

 

微信公众号

声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。

网友评论:

发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
SEM推广服务

Copyright©2005-2026 Sykv.com 可思数据 版权所有    京ICP备14056871号

关于我们   免责声明   广告合作   版权声明   联系我们   原创投稿   网站地图  

可思数据 数据标注行业联盟

扫码入群
扫码关注

微信公众号

返回顶部