目录

大数据-spark-hive-总结

大数据 spark hive 总结

Apache Spark

简介

是一个开源的统一分析引擎,专为大规模数据处理而设计。它提供了高级API,支持Java、Scala、Python和R语言,并且包含了一个优化过的执行引擎,该引擎支持循环计算(如机器学习算法)和交互式查询。以下是Spark的一些关键特性和概念

核心特性

  1. 速度 :Spark通过内存计算提高了数据处理的速度,比Hadoop MapReduce快达10到100倍。
  2. 易用性 :提供丰富的高层次API,包括DataFrame和Dataset API,简化了数据操作。
  3. 通用性 :除了Map和Reduce操作之外,还支持SQL查询、流处理、机器学习和图计算等多种工作负载。
  4. 可扩展性 :能够有效地在数千个节点上并行运行。
  5. 容错性 :使用RDD(Resilient Distributed Dataset)抽象层,自动处理节点故障恢复。

基本概念


1. RDD (Resilient Distributed Dataset)

  • 定义

    RDD 是 Spark 的最底层抽象,表示分布在集群节点上的不可变、可分区的数据集合。它提供 低级别的 API ,强调对数据的 细粒度控制

  • 特点

    • 强类型 :存储任意类型的对象(如 RDD[User] )。
    • 手动优化 :需要开发者自行处理序列化、分区、缓存等优化。
    • 函数式操作 :通过 mapfilterreduce 等函数式算子处理数据。
    • 容错性 :通过血统(Lineage)机制重建丢失的分区。
  • 适用场景

    非结构化数据、需要精细控制的分布式计算(如自定义分区策略)。

2. DataFrame

  • 定义

    DataFrame 是基于 RDD 构建的 结构化数据抽象 ,类似于关系型数据库的表或 Pandas 的 DataFrame。在 Spark 1.3 中引入。

  • 特点

    • Schema 约束 :数据具有明确的结构(列名、数据类型),通过 Row 对象表示行。
    • 优化执行 :利用 Catalyst 优化器和 Tungsten 执行引擎,自动优化查询计划。
    • API 类型 :弱类型 API(列名在运行时检查),支持 SQL 语法。
    • 跨语言支持 :在 Java、Scala、Python、R 中接口一致。
  • 适用场景

    结构化/半结构化数据(如 JSON、CSV)、SQL 式查询、需要自动优化的批处理。

3. Dataset

  • 定义

    Dataset 是 Spark 1.6 引入的 API,结合了 RDD 的 强类型特性 和 DataFrame 的 优化引擎 。仅在 Scala 和 Java 中可用。

  • 特点

    • 强类型 + 结构化 :兼具 RDD 的类型安全(如 Dataset[User] )和 DataFrame 的优化能力。
    • 统一 API :与 DataFrame API 兼容(DataFrame = Dataset[Row] )。
    • 编码器(Encoder) :使用高效的二进制序列化(优于 Java 序列化)。
  • 适用场景

    需要类型安全的复杂业务逻辑、结合函数式和关系式操作的场景。

核心区别对比

特性RDDDataFrameDataset
数据类型任意对象(强类型)结构化的 Row 对象(弱类型)强类型对象(如 Dataset[User]
序列化Java 序列化(较慢)Tungsten 二进制编码(高效)Encoder 序列化(高效)
优化无自动优化,需手动调优Catalyst 优化器自动优化Catalyst 优化器自动优化
API 风格函数式( map , filter声明式(SQL/DSL)混合式(强类型 API + DSL)
类型安全编译时类型检查运行时类型检查编译时类型检查
语言支持所有 Spark 语言所有 Spark 语言仅 Scala/Java

演进关系

  • Spark 1.x :RDD → DataFrame(为结构化数据优化)。
  • Spark 2.x :DataFrame 和 Dataset 统一为 Dataset[T] (DataFrame = Dataset[Row] )。

如何选择?

  1. 优先用 DataFrame/Dataset

    大多数场景下,结构化数据处理更高效(Catalyst 优化 + Tungsten)。

  2. 需要类型安全时用 Dataset

    如 Scala/Java 中复杂业务逻辑。

  3. 仅底层控制时用 RDD

    如自定义分区、非结构化数据,或需直接操作分布式数据。

代码示例

https://i-blog.csdnimg.cn/direct/864700a81eae4f89acd3740a50ead132.png

spark-submit 参数

在 Spark 中, spark-submit 是提交作业到集群的核心命令。以下是常用参数及其作用,分为 基础参数资源参数调优参数

一、基础参数

参数说明示例
--master指定集群模式yarn , local[*] , spark://host:port , k8s://...
--deploy-mode部署模式(客户端或集群)client (默认)或 cluster (适合生产)
--class主类名(含包路径)--class com.example.MainApp
--name作业名称(显示在集群UI)--name "My Spark Job"
--files上传文件到 Executor(如配置文件)--files config.json
--jars添加依赖的 JAR 包(逗号分隔)--jars lib1.jar,lib2.jar
--packages从仓库自动下载依赖(Maven格式)--packages org.apache.kafka:kafka-clients:3.4.0

二、资源参数

参数说明示例注意事项
--executor-memory每个 Executor 的内存--executor-memory 4g需预留内存给系统和开销(如总内存的10%)
--driver-memoryDriver 进程的内存--driver-memory 2g客户端模式下需本地足够内存
--num-executorsExecutor 数量--num-executors 10根据集群资源动态调整
--executor-cores每个 Executor 的 CPU 核数--executor-cores 2总核数 = num-executors * executor-cores
--total-executor-cores所有 Executor 的总核数(Standalone 模式)--total-executor-cores 20优先级低于 num-executors

三、调优参数

参数说明示例用途
--conf spark.serializer指定序列化方式--conf spark.serializer=org.apache.spark.serializer.KryoSerializer优化序列化性能
--conf spark.sql.shuffle.partitions调整 Shuffle 分区数--conf spark.sql.shuffle.partitions=200避免数据倾斜或分区过大
--conf spark.default.parallelism默认并行度--conf spark.default.parallelism=100控制 RDD 的分区数
--conf spark.memory.fractionExecutor 内存中用于执行和存储的比例--conf spark.memory.fraction=0.6调整内存分配策略
--conf spark.dynamicAllocation.enabled启用动态资源分配--conf spark.dynamicAllocation.enabled=true按需增减 Executor(需集群支持)

示例命令

https://i-blog.csdnimg.cn/direct/2cf4368048264b5c8fa913741afe5050.png

关键注意事项

  1. 资源分配

    • 总内存和核数不能超过集群资源上限。
    • 在 YARN 模式下, --executor-memory 包含堆外内存,需预留约 10% 的额外内存(如申请 4g ,实际可用约 4g * 0.9 )。
  2. 动态资源分配

    • 启用 spark.dynamicAllocation.enabled=true 时需配置 spark.shuffle.service.enabled=true (YARN 需启动 Shuffle Service)。
  3. 依赖管理

    • 优先使用 --packages 自动下载依赖,避免手动传 JAR。
    • 本地依赖用 --jars ,集群依赖需预先上传到 HDFS 或共享存储。
  4. 日志与调试

    • 添加 --conf spark.eventLog.enabled=true 记录事件日志。
    • 在客户端模式下,Driver 日志输出到控制台;集群模式下需通过集群 UI 查看。

参数优化场景

  • 数据倾斜 :增大 spark.sql.shuffle.partitions 或使用 repartition
  • OOM 错误 :增加 executor-memory 或调整 spark.memory.fraction
  • CPU 瓶颈 :增加 num-executorsexecutor-cores
  • 网络超时 :调整 spark.network.timeout (默认 120s)。

hive

Hive 是构建在 Hadoop 生态系统之上的数据仓库工具,旨在简化大规模数据的查询和管理。它通过类 SQL 语法(HiveQL)将结构化数据操作转化为 MapReduce、Tez 或 Spark 任务,适合处理海量数据(如日志、用户行为等)。以下是其核心概念和用法:

Hive 核心特性

特性描述
SQL-like 语法支持类似 SQL 的查询语言(HiveQL),降低大数据处理的学习成本。
数据存储数据存储在 HDFS(Hadoop 分布式文件系统)中,支持多种文件格式(如 ORC、Parquet)。
元数据管理使用 Metastore(如 MySQL)存储表结构、分区等元信息。
扩展性支持自定义函数(UDF)、SerDe(序列化/反序列化工具)等扩展功能。
批处理基于 MapReduce 或 Tez 引擎,适合离线批处理, 不适用于实时查询

Hive 架构

  1. 用户接口

    CLI、JDBC、Web UI 等工具提交 HiveQL 查询。

  2. Driver

    解析查询,生成执行计划,管理任务生命周期。

  3. 编译器

    将 HiveQL 转换为 MapReduce/Tez/Spark 任务。

  4. 元数据存储

    Metastore 存储表结构、分区、字段类型等信息。

  5. 执行引擎

    运行编译后的任务,读写 HDFS 数据。

Hive 数据模型

表(Table) 类似关系型数据库的表,支持内部表(数据由 Hive 管理)和外部表(数据由用户管理)。

CREATE TABLE users (id INT, name STRING) STORED AS ORC;

分区(Partition)

按某一列的值划分数据目录,加速查询(如按日期分区)。

CREATE TABLE logs (log_time STRING, content STRING)

PARTITIONED BY (dt STRING);

分桶(Bucket)

按哈希值将数据分到多个文件,优化 JOIN 和采样效率。

CREATE TABLE orders (order_id INT, user_id INT)

CLUSTERED BY (user_id) INTO 10 BUCKETS;

Hive 应用场景

场景说明
离线数据分析处理 TB/PB 级历史数据(如用户行为分析、日志统计)。
ETL 流程清洗、转换数据后导入数据仓库(如将 CSV 转换为 ORC 格式)。
数据挖掘结合机器学习库(如 Hive + Mahout)进行聚类、分类等操作。
报表生成定时生成统计报表(如每日销售额汇总)。

Hive 优缺点

优点缺点
易用性强(SQL 语法)延迟高(分钟级响应,不适合实时查询)
可扩展性高(自定义 UDF)不支持事务和行级更新(Hive 3 部分支持)
兼容 Hadoop 生态(HDFS、HBase 等)需要优化分区和存储格式提升性能

Hive vs 传统数据库

对比项Hive传统数据库(如 MySQL)
数据规模支持 PB 级数据适合 GB/TB 级数据
响应速度高延迟(批处理)低延迟(实时查询)
事务支持有限支持(Hive 3+)完整支持 ACID
存储与计算分离(HDFS + 计算引擎)耦合(本地存储 + 计算

Hive 使用示例

创建表并加载数据

CREATE EXTERNAL TABLE user_logs (

ip STRING,

url STRING,

time STRING

) PARTITIONED BY (dt STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’

LOCATION ‘/hive/data/user_logs’;

LOAD DATA INPATH ‘/input/log_20231001.txt’ INTO TABLE user_logs PARTITION (dt=‘2023-10-01’);

聚合查询

SELECT dt, COUNT(*) AS pv

FROM user_logs

WHERE dt BETWEEN ‘2023-10-01’ AND ‘2023-10-07’

GROUP BY dt;

连接多个表

SELECT u.name, SUM(o.amount)

FROM orders o

JOIN users u ON o.user_id = u.id

GROUP BY u.name;

生态工具

  • Hive Metastore :独立元数据服务(供 Spark、Presto 等共用)。
  • Hive on Spark :用 Spark 替代 MapReduce 提升计算速度。
  • Hive LLAP (Live Long and Process):低延迟交互式查询。

总结

  • 适用场景 :离线批处理、海量数据仓库管理。
  • 替代方案 :实时查询用 ImpalaPresto ;复杂分析用 Spark SQL
  • 学习建议 :掌握 HiveQL 语法、分区优化和存储格式(ORC/Parquet)。