intellij idea下载 接口测试 typora log4j methods plugins terminal process cron vue案例 后台管理系统模板 vue教学视频 直销系统源码 jquery绑定事件的方法 bentley软件介绍 git登录命令 linux管道符 js原生点击事件 mysql分区表优劣分析 kubernetes入门 mysql临时表 python命令 java编程课程 java入门教程 java环境安装 java中class java安装 java游戏制作 hadoop权威指南 机械下载 m4a转mp3格式转换器 qtp下载 蒙文字体 alphacam 通讯录管理系统 为什么英雄联盟无法连接服务器 橄榄山快模 微信砍价活动怎么做 phpword VSPD
当前位置: 首页 > 学习教程  > 编程语言

spark sql+hive ETL

2020/8/31 13:33:29 文章标签:

spark-sql 缺点:执行语句insert overwrite table xx…在结果目录会有大量小文件,容易内存溢出执行失败

Spark sql +hive

create table if not exists db.res(sum_id string,cnt_id string,dis_id string,cnt_uid string,dis_uid string) partitioned by (dt string comment '分区日期 yyyy-MM-dd') stored as orcfile;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table db.res partition(dt) select sum(id) as sum_id,count(id) cnt_id,count(distinct id) as dis_id,count(user_id) cnt_uid,count(distinct user_id) dis_uid,split(event_date,' ')[0] dt from db.test where user_id is not null and event_date is not null group by split(event_date,' ')[0];


spark sql+shell脚本:

#!/bin/bash
table_name=$1
row_limit=$2
#seconds from 1970-01-01
date_seconds=`date +%s`
echo $date_seconds
sh_table_date=`expr $date_seconds / 86400`
echo $sh_table_date
#Statment_create
#creat_sql="select * from ${sh_table_name} limit ${sh_row_limit};"
#echo "${creat_sql}"
conf="--master yarn-client --name spark-sh03 --queue q1 --num-executors 3 --executor-cores 2 --executor-memory 2g"
$SPARK_HOME/bin/spark-sql ${conf} -f /home/luolw/t.sql

#!/bin/bash
sh_database=$1
sh_table_name=$2
sh_row_limit=$3
#seconds from 1970-01-01
date_seconds=`date +%s`
echo $date_seconds
sh_table_date=`expr $date_seconds / 86400`
echo $sh_table_date
#Statment_create
creat_sql="select * from ${sh_table_name} limit ${sh_row_limit};"
echo "${creat_sql}"
conf="--master yarn --deploy-mode client --name spark_sh01 --queue q1 --num-executors 3 --executor-cores 2 --executor-memory 2g"
$SPARK_HOME/bin/spark-sql ${conf} -e "${creat_sql}"

[aa@hd12 ~]$ vim luo2.sql
sh_table_name=$2
sh_row_limit=$3
#seconds from 1970-01-01
date_seconds=`date +%s`
echo $date_seconds
sh_table_date=`expr $date_seconds / 86400`
echo $sh_table_date
#Statment_create
create_table="create table if not exists db.res(sum_id string,cnt_id string,dis_id string,cnt_uid string,dis_uid string) partitioned by (dt string comment '分区日期 yyyy-MM-dd') stored as orcfile"

#create_table="create table if not exists db.res(sum_id string,cnt_id string,dis_id string,cnt_uid string,dis_uid string) partitioned by (dt string comment '分区日期 yyyy-MM-dd') row format delimited fields terminated by ',' lines terminated by '\n' stored as orcfile;"

#creat_sql="select * from ${sh_table_name} limit ${sh_row_limit};"
insert_sql="set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table db.res partition(dt) select sum(id) as sum_id,count(id) cnt_id,count(distinct id) as dis_id,count(user_id) cnt_uid,count(distinct user_id) dis_uid,split(event_date,' ')[0] dt from db.test where user_id is not null and event_date is not null group by split(event_date,' ')[0]"
echo "${creat_sql}"
conf="--master yarn --deploy-mode client --name spark_sh02 --queue q1 --num-executors 3 --executor-cores 2 --executor-memory 2g"
$SPARK_HOME/bin/spark-sql ${conf} -e "${create_table};${insert_sql}"


spark sql:
./beeline

!connect jdbc:hive2://hadoop01.com:2181,hadoop02.com:2181,hadoop03.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2

spark on hive beeline客户端测试:
[luolw@hd12 bin]$ ./beeline -u jdbc:hive2://hadoop01.com:10000 -n tom
Connecting to jdbc:hive2://hd11.yyuap.com:10000

spark-sql :

spark-sql --master yarn --deploy-mode client  --executor-memory 4g --executor-cores 2 --num-executors 5 --name tom_spark-sql --queue etl_q

set hive.exec.orc.split.strategy=ETL;


复杂聚合:
select  split(event_date,' ')[0]  dt,
sum(id),count(id),count(distinct id),count(user_id),count(distinct user_id)
from test
where user_id is not null
and event_date is not null
group by split(event_date,' ')[0];


select  ‘20200202’  dt,
sum(id),count(id),count(distinct id),count(user_id),count(distinct user_id)
from test
where user_id is not null
and event_date is not null


cube:
select grouping__id,split(event_date,' ')[0]  dt,target,action,sum(id) as sum_id,count(id) cnt_id,
count(distinct id) as dis_id,count(user_id) cnt_uid,
count(distinct user_id) dis_uid 
from test 
where user_id is not null and event_date is not null 
group by split(event_date,' ')[0],target,action 
with cube;


视图:
create view if not exists res_view as
select grouping__id as set_id,rpad(reverse(bin(cast(GROUPING__ID AS bigint))),3,'0'),split(event_date,' ')[0],target,action,sum(id) as sum_id,count(id) cnt_id,count(distinct id) as dis_id,count(user_id)
 cnt_uid,count(distinct user_id) dis_uid 
from test 
where user_id is not null and event_date is not null 
group by split(event_date,' ')[0],target,action
with cube 
order by set_id;

spark job配置参数优化:

spark.akka.frameSize=200
spark.akka.timeout=100
spark.shuffle.manager=SORT
spark.scheduler.mode=FAIR
#spark.scheduler.allocation.file
spark.default.parallelism=200
spark.storage.memoryFraction=0.6
spark.executor.instances=40
spark.streaming.unpersist=true
spark.io.compression.codec=snappy
spark.driver.extraJavaOptions=-XX:+UseG1GC
spark.executor.extraJavaOptions=-XX:+UseG1GC
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.streaming.backpressure.enabled=true
spark.streaming.kafka.maxRatePerPartition=500
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRetries=3
spark.streaming.concurrentJobs=1
spark.yarn.executor.memoryOverhead=2048
spark.yarn.driver.memoryOverhead=1024
spark.rdd.compress=true
spark.speculation=false
spark.speculation.interval=100
spark.speculation.quantile=0.75
spark.speculation.multiplier=1.5
spark.scheduler.executorTaskBlacklistTime=30000
spark.streaming.stopGracefullyOnShutdown=true


本文链接: http://www.dtmao.cc/news_show_150092.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?