Hive 数据倾斜

Hive数据倾斜问题 大表Join大表 大表Join小表 group By

大表Join大表

思路一:SMB Join

smb是sort merge bucket操作,首先进行排序,继而合并,然后放到所对应的bucket中去,bucket是hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的buck中去。在进行两个表联合的时候。我们首先进行分桶,在join会大幅度的对性能进行优化。也就是说,在进行联合的时候,是bukect中的一小部分和bukect中的一小部分进行联合,table联合都是等值连接,相同的key都放到了同一个bucket中去了,那么在联合的时候就会大幅度的减小无关项的扫描。

  1. 设置参数
    1
    2
    3
    4
    set hive.auto.convert.sortmerge.join=true;
    set hive.optimize.bucketmapjoin = true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    set hive.auto.convert.sortmerge.join.noconditionaltask=true;
  2. 两个表的bucket数量相等
  3. Bucket列、Join列、Sort列、Skewed列为相同的字段
  4. 必须是应用在bucket mapjoin 的场景中
  5. 注意点
    hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表,否则可能数据不正确。有两个办法
    • hive.enforce.sorting 设置为true
    • 手动生成符合条件的数据,通过在sql中用distributed c1 sort by c1 或者 cluster by c1,表创建时必须是CLUSTERED且SORTED,如下
    • 创建Skewed Table提高有一个或多个列有倾斜值的表的性能,例如:
      1
      2
      3
      create table test_smb_2(mid string,age_id string)
      CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS
      SKEWED BY(mid) on (001)
  6. 案例
    1. 设置先关参数
      1
      2
      3
      create table test_smb_2(mid string,age_id string)
      CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS
      SKEWED BY(mid) on (001)
    2. 创建桶表
      user表
      1
      2
      3
      4
      5
      create table user_info_bucket(userid string,uname string)
      clustered by(userid) into 4 buckets
      SKEWED BY(userid) on (001)
      row format delimited fields terminated by "\t";
      STORED AS DIRECTORIES
      domain表
      1
      2
      3
      4
      5
      create table domain_info_bucket(userid string,domainid string,domain string)
      clustered by (userid) into 4 buckets
      SKEWED BY(userid) on (001)
      row format delimited fields terminated by "\t";
      STORED AS DIRECTORIES
    3. 分别倒入数据
      1
      2
      insert overwrite table user_info_bucket select userid ,uname from user
      insert overwrite table domain_info_bucket select userid ,domainid,domain from doamin
    4. 查询
      1
      select * from user_info_bucket u  join domain_info_bucket d on(u.userid==d.userid)
思路二:一分为二

选择临时表的方式,将数据一分为二,把倾斜的key,和不倾斜的key分开处理,不倾斜的正常join,倾斜的根据情况选择mapjoin或加盐处理,最后结果union all结果

user表为用户基本表,domain为用户访问域名的宽表

注意:我们其实隐含使用到了mapjoin,hive中的参数为set hive.auto.convert.join=true;,自动开启,默认25M,不能超过1G。

  1. 创建中间表
    1
    2
    3
    4
    create table tmp_table(userid string,uname string)
    SKEWED BY(userid) on (001)
    row format delimited fields terminated by "\t";
    STORED AS DIRECTORIES
  2. count(*)出符合倾斜条件的数据存入中间表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    insert overwrite table tmp_table
    select
    d.userid,u.uname
    from
    (select userid
    from
    (select
    userid,
    count(userid) u_cunt

    from
    domain
    group by
    userid) t
    where u_cunt>100) d
    left outer join
    (select
    userid,uname
    from
    user) u
    on d.userid = u.userid;
  3. 一分为二,分别查询中出结果,再union all
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    select 
    u1.userid,u1.uname,d1.domain
    from
    (select
    userid,uname
    from
    user) u1
    join
    (select
    d.userid,d.domain
    from
    domain d
    left outer join
    tmp_table t
    on
    d.userid = t.userid
    where
    t.userid is null) d1
    on u1.userid = d1.userid

    union all

    select
    u2.userid,u2.uname,d2.domain
    from
    (select
    userid,uname
    from
    user) u2
    join
    (select
    d.userid,d.domain
    from
    domain d
    left outer join
    tmp_table t
    on
    d.userid = t.userid
    where
    t.userid is not null) d2
    on u2.userid = d2.userid;

大表Join小表

思路:MapJoin

大表Join小表很好解决,把小表放进内存,大表再去匹配即可。

  1. 开启MapJoin
    1
    set hive.auto.convert.join=true;
  2. 调整MapJoin小表大小,默认25M(调整为可以容忍的大小)
    1
    set hive.mapjoin.smalltable.filesize
  3. 如果是MR,小表放进Map,大表进入Mapper匹配Map(使用对象存储结果)

group By解决

思路:加盐去盐
  1. 开启数据倾斜时的负载均衡
    1
    set hive.groupby.skewindata=true
  2. 对groupby的key加盐去盐
    开启上面这个参数,hive会自动拆解成两个MR,加盐去盐,最终输出结果
    MR需要写两个MR,一个对key加盐,一个对key去盐