1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
| # # 究其本质 #
# # 问题所在 # 1、倾斜:抽样 1.1、内容倾斜 1.2、group by 1.3、小表 join 大表 2、过多 2.1、join过多导致job过多 2.2、小文件过多 2.3、Mapper或Reducer过多 3、使用不当 3.1、count(distinct) 3.2、join ... on ... where 3.3、select sum(field) from TABLE;
# # 解决方案 # # 航母发动一次消耗大,一次任务数据量太小资源浪费 # 模型设计 # 整体最优,考虑全局 合理减少表数量: 数据建模: 【星型】,雪花,星座 维度表(静态数据),事实表(动态数据:4W1H) 维度退化 => 星型 sqoop|maxwell|cancal : query "select ... join ..." ods -> dwd insert into ... select ... join ... # 充分了解业务,提前设计好预聚合 分层=>轻量聚合 分区=>避免交换 on... where... 分桶=>拉链表(分桶表)、抽样 压缩=>减少体量(现在不太强调) 配,压缩格式,表存储格式(是否支持压缩,是否支持切片) hadoop hive spark
# hadoop 内存管理 # mapred set mapreduce.map.memory.mb=256; set mapreduce.reduce.memory.mb=512; set mapreduce.map.java.opts=? set mapreduce.reduce.java.opts=? # yarn set yarn.nodemanager.resource.memory-mb=-1; set yarn.scheduler.minimum-allocation-mb=1024; set yarn.scheduler.maximum-allocation-mb=8192;
# 倾斜:热点数据 内置自动优化 # join: 非大小表 #原因 连接字段在连接表之间分布不均,或缺乏连接关系 【连接键】的值分布不均:值集中的少数分区,其他分区没有值 某张表中数据分布不均 #手动处理 连接键的选择 连接键拆分或随机映射 引入hash分区或分桶使得数据分布均衡 增加或减少任务的并行度 #自动处理 # 默认false,如果join键倾斜则设为true set hive.optimize.skewjoin=true; # 默认join键对应的记录数超过该值则进行倾斜分析 set hive.skewjoin.key=100000; # 默认10000,设置倾斜处理mapper数量,过大会导致过多的任务切割和额外的开销,过小会导致不能优化 set hive.skewjoin.mapjoin.map.tasks=10000; # 默认32M,倾斜最小切片大小,配合上一项使用,避免Mapper数量过多 set hive.skewjoin.mapjoin.min.split=32M; # map join: 大小表 # 默认true,即默认自动开启 mapjoin set hive.auto.convert.join=true; # 默认小表<=25M set hive.mapjoin.smalltable.filesize=25M; # 默认false,分桶表表mapjoin专用 set hive.optimize.bucketmapjoin=true; # combiner: #默认true,即默认开启Mapper端聚合 set hive.map.aggr=true; # groupby:HashPartitioner # 默认-1,倾斜的倍数(倾斜度) n = 倾斜数据总均量/其他数据总均量 + 其他数据的差异数 # 抽样(tablesample(bucket COUNT outof TOTAL))确定是偶倾斜与倾斜程度 set mapreduce.job.reduces=n; (见下面 Reducer 数量控制) # 默认false set hive.groupby.skewindata=true; # sql manunal # 抽样统计,是否倾斜,倾斜程度:20%的键占用了80%的数据 # 倾斜键加分隔前缀加盐,分组统计,去盐后再分组统计
# Map或Reduce输出过多小文件合并 # 若满足以下设置条件,任务结束后会单起MapReduce对输出文件进行合并 # 默认为true,map-only输出是否合并 set hive.merge.mapfiles=true; # 默认为false,mapreduce输出是否合并 set hive.merge.mapredfiles=true; # 默认256M,合并文件操作阈值,如果输入数据超过256M,则触发合并操作 set hive.merge.size.per.task=256M; # 默认16M,合并文件平均大小小于该阈值则将他们合并为大文件 set hive.merge.smallfiles.avgsize=16M;
# 控制Mapper和Reducer数量 # mapper的启动和初始化开销较大,【数量过多】导致开销大于逻辑处理,浪费资源 # # Mapper # # 默认的Mapper数量 int default_num = total_file_size/dfs.block.size; #默认为2, 只有大于2时才会生效 set mapred.map.tasks=2; #旧版 set mapreduce.job.maps=2; #新版 # Mapper数量有限值: # Math.max(min.split.size,Math.min(dfs.block.size,max.split.size)) # 默认128M set dfs.block.size=128M; # 默认单个Mapper处理数据上限256M set mapred.max.split.size=256M; # 默认1字节 set mapred.min.split.size=1; # 默认单个节点处理的数据下限1字节 set mapred.min.split.size.per.node=1; # 默认单个机架处理的数据下限1字节 set mapred.min.split.size.per.rack=1; # Mapper输入多个小文件合并后再切片 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat # Mapper切片的大小【越接近128M越好】? # 表数据文件过大 set mapred.min.split.size = N; # 需要综合考虑Yarn的内存权限 # 分布式计算的均衡 # 若表A单行内容量大,且处理逻辑复杂,需要将文件拆分(列裁剪,行筛选) # 将数据通过分区表拆分成更小粒度 set mapreduce.job.reduces=3; create table A_SPLITS as select * from A distribute by rand(3); # # Reducer # # 默认-1,可以根据需要在客户端设置 : int n = Math.min(SIZE/bytes.per.reducer, reducers.max) | num_partitions set mapred.reduce.tasks=n; # 旧版本使用 set mapreduce.job.reduces=n; # 新版本使用 ✔ # 若存在数据倾斜,则 Hive 会单独分配 Reducer 处理倾斜数据 # 若未设置 Reducer 数量,自动计算 Reducer 数量 # 默认每个Reducer的数据为256M set hive.exec.reducers.bytes.per.reducer=256M; # 默认单个任务最大Reducer数量 set hive.exec.reducers.max=1009; # Reducer只能为1的情况 # 没有group by,直接使用sum,count,max,min,avg,collect_list,concat_ws等聚合函数 #优化方案 select sum(sum_a) from (select sum(a) from A group by STH)T # 使用了order by #优化方案 #group by 同键分组且必须聚合,distribute by 仅按键分散数据 #orderby 全局排序仅1个Reducer,sort by 仅分区内有序 set mapreduce.job.reduces=N; select * from (select * from A distribute by a sort by a) order by a; # 存在笛卡尔积,尽量不用 # 若出现小表+大表的笛卡尔积 # 小表扩展join key,并根据需求复制 DN_COUNT 份 # 大表扩展join key,根据 DN_COUNT 随机生成 # 关闭自动mapjoin : set hive.auto.convert.join=false # 设置reducer的数量为:set mapreduce.job.reduces=DN_COUNT # 减少数据规模 # 调整存储格式 # create 建表的默认格式和分区表 #默认TextFile, 可选:orc,RCFile,SequenceFile set hive.default.fileformat=orc; # 提升IO性能,但会增加CPU压力 # Mapper压缩 # 开启map输出压缩功能;默认false set mapreduce.map.output.compress=true; # 设置map输出数据的压缩方式;默认DefaultCodec set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec; # 默认false,任务过程输出是否压缩 set hive.exec.compress.intermediate=true; # Reducer压缩 # 开启Reducer输出压缩功能;默认false set hive.exec.compress.output=true; # reduce最终输出数据压缩;默认false set mapreduce.output.fileoutputformat.compress=true; # reduce最终数据输出压缩为块压缩;默认RECORD set mapreduce.output.fileoutputformat.compress.type=BLOCK; # reduce最终数据输出压缩方式;默认DefaultCodec set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec; # 动态分区 # 默认开启 set hive.exec.dynamic.partition=true; # 默认strict set hive.exec.dynamic.partition.mode=nonstrict; # 默认最大动态分区数1000 set hive.exec.max.dynamic.partitions=1000; # 默认单节点最大动态分区数100 set hive.exec.max.dynamic.partitions.pernode=100; # 动态添加多分区数据(需要一张源数据表) insert into table TABLE_PARTITION partition(partition_field) select *, partition_field from TABLE_SOURCE where ...; # 静态分区数据挂载 load data [local] inpath 'DATA_PATH' [overwrite|into] table TABLE_PARTITION partition(partition_field=VALUE); # 查看分区 show partitions TABLE_PARTITION; # 添加分区 alter table TABLE_PARTITION add partition(partition_field=VALUE); # 删除分区 alter table TABLE_PARTITION drop partition(partition_field=VALUE); # count(distinct) # 不妥:select count(distict b) from TAB group by a # 稳妥:select count(b) from (select a,b from TAB group by a,b) group by a # CBO (COST BASED OPTIMIZER) # 默认true set hive.cbo.enable=true; # 分区裁剪 # 以 on,where 多条件字段顺序,建【多重】分区表 # 默认开启支持,以分区字段为条件筛选数据 # tez引擎:动态分区剪裁支持 # 默认mr, tez|spark DAG set hive.execution.engine=tez; # 并行执行无依赖job # 默认false(关闭) set hive.exec.parallel=true; # 默认8,最大并行任务数 set hive.exec.parallel.thread.number=8; # JVM重用 : hive3已经取消 #依赖hadoop mapred-site.xml一下配置 #每个jvm运行的任务数 set mapreduce.job.jvm.numtasks = 8; # 本地化运算 # 默认1,启动本地化模式reducer数量必须为0|1 set mapreduce.job.reduces=0/1; # 默认 yarn set mapreduce.framework.name=local; # 开启自动本地化模式 set hive.exec.mode.local.auto=true; # 默认4,本地化文件数量上限 set hive.exec.mode.local.auto.input.files.max=4; # 默认128M,本地化文件大小上限 set hive.exec.mode.local.auto.inputbytes.max=128M; # 可能会导致内存溢出:java.lang.OutofMemoryError : java heap space # 修改 mv hive-env.sh.template hive-env.sh # 去掉注释 ------------------------------ # export HADOOP_HEAPSIZE=1024 ------------------------------ # llap # 默认container,2.0之后扩展此项,llap可选 set hive.execution.mode=llap; # llap为DataNode常驻进程,混合模型,小型任务可以由llap解决,大任务由yarn容器执行 # fetch # 默认more,简单查询不走mr,直接提取 set hive.fetch.task.conversion=more; # ------------------------------------------| # explain extended select ... 查看执行计划 | # ------------------------------------------| # 开启谓词(筛选条件表达式)下推(见以下表格) set hive.optimize.ppd=true; join中谓词:on join后谓词:where 左右外连接 主表:全部显示, on条件不能下推,where可以下推 从表:不存在以NULL填充,where不能下推,on可以下推 内连接:on和where都下推 全外连接:on和where都不下推 # 谓词下推 :过滤条件最接近数据源,换言之即先筛选掉无关数据再做其他处理 select * from A left join B on A.id=B.id where A.id=1; select * from (select * from A where A.id=1)T left join B on T.id=B.id; select * from A left join B on A.id=B.id and A.id=1;
|