阅读次数
标签(空格分隔): 性能优化
在我们的实际使用中,经常需要将原始的文本文件转换为parquet列存储格式,以便后续查询的时候使用。写parquet能提高后续表查询效率这个事情我们不多说,下面讨论一下写parquet文件的效率问题:
我们来看一下两段程序:
1.使用case class作为df转换
package com.yundata.transtoparquet
import java.lang.Exception
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.PairRDDFunctions
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;
case class formatrow(fs_id:Long,user_id:Long,app_id:Long,parent_path:String,server_filename:String,s3_handle:String,size:Long,server_mtime:String,server_ctime:Long,local_mtime:Long,local_ctime:Long,isdir:Long,isdelete:Long,status:Long,category:Long,object_key:String,extent_int2:Long,recompute_tag:Long,user_range:Long,md5_range:Long,event_day:String) extends Product;
/**
* Created by robert on 15-5-18.
*/
class transtoparquet(userConfFile: String,sourceFile:String,destFile:String,event_day:String) extends Serializable{
def run(): Unit = {
transtoparquetconf(userConfFile);
val conf=transtoparquetconf.getSparkConf();
conf.setAppName(conf.get("spark.app.name", this.getClass.getName));
val sc=new SparkContext(conf);
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._
sc.hadoopConfiguration.addResource("hdfs-site.xml");
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
System.out.println(sc.hadoopConfiguration.get("dfs.ha.namenodes.bigfile"));
System.out.println(sc.hadoopConfiguration.toString);
val txtfile = sc.textFile(sourceFile);
txtfile.take(10).foreach(println);
println(sourceFile);
val txtdf=txtfile.filter(_.split("\t").size==18).map(
x=>{
var s3_handle=x.split("\t")(5);
var md5_range=Long2long(0);
if(x.split("\t")(5)=="")
{
s3_handle=(new scala.util.Random).nextInt(99).toString(); //取0-99之间的随机数,保证其散列开来
md5_range=s3_handle.toLong;
}
else if(x.split("\t")(5).size==32)
{
md5_range=((s3_handle.toLowerCase.substring(0, 24).toList.map("0123456789abcdef".indexOf(_)).map(BigInt(_)).reduceLeft( _ * 32 + _))%100).toLong
}
else
{
md5_range = Long2long(-1);
}
var user_range=(x.split("\t")(0)).toLong/100000000;
if(user_range>=100)
{
user_range=100;
}
if(md5_range == Long2long(-1))
{
null
}
else {
formatrow((x.split("\t")(0)).toLong, x.split("\t")(1).toLong, x.split("\t")(2).toLong, x.split("\t")(3), x.split("\t")(4), s3_handle, (x.split("\t")(6)).toLong, x.split("\t")(7), (x.split("\t")(8)).toLong, (x.split("\t")(9)).toLong, (x.split("\t")(10)).toLong, (x.split("\t")(11)).toLong, (x.split("\t")(12)).toLong, (x.split("\t")(13)).toLong, (x.split("\t")(14)).toLong, x.split("\t")(15), (x.split("\t")(16)).toLong, (x.split("\t")(17)).toLong, user_range, md5_range, event_day);
}
}
).filter(_ != null).toDF();
txtdf.repartition(5).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("user_range","md5_range","event_day").parquet(destFile);
}
}
object transtoparquet{
def main(args: Array[String]): Unit = {
if (args.size != 4) {
println("usage: com.yundata.transtoparquet.transtoparquet config srcfile destfile event_day")
return
}
new transtoparquet(args(0),args(1),args(2),args(3)).run()
}
}
这个程序实际测试,原始数据9G,压缩后大概是5.5G左右,使用50个核跑了好几个小时,居然都没有写完数据,看executor日志,几B几B地在往hdfs当中去写日志。崩溃,于是,换了一种写法。
2.使用row作为df转换
package com.yundata.transtoparquet
import java.lang.Exception
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.PairRDDFunctions
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType};
import org.apache.spark.sql.Row;
/**
* Created by robert on 15-5-18.
*/
class transtoparquet(userConfFile: String,sourceFile:String,destFile:String,event_day:String) extends Serializable{
def run(): Unit = {
transtoparquetconf(userConfFile);
val conf=transtoparquetconf.getSparkConf();
conf.setAppName(conf.get("spark.app.name", this.getClass.getName));
val sc=new SparkContext(conf);
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._
sc.hadoopConfiguration.addResource("hdfs-site.xml");
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
System.out.println(sc.hadoopConfiguration.get("dfs.ha.namenodes.bigfile"));
System.out.println(sc.hadoopConfiguration.toString);
val txtfile = sc.textFile(sourceFile);
txtfile.take(10).foreach(println);
println(sourceFile);
val schema=StructType(Array(StructField("fs_id",LongType,true), StructField("user_id",LongType,true),StructField("app_id",LongType,true),StructField("parent_path",StringType,true),StructField("server_filename",StringType,true),StructField("s3_handle",StringType,true),StructField("size",LongType,true),StructField("server_mtime",StringType,true),StructField("server_ctime",LongType,true),StructField("local_mtime",LongType,true),StructField("local_ctime",LongType,true),StructField("isdir",LongType,true),StructField("isdelete",LongType,true),StructField("status",LongType,true),StructField("category",LongType,true),StructField("object_key",StringType,true),StructField("extent_int2",LongType,true),StructField("recompute_tag",LongType,true),StructField("user_range",LongType,true),StructField("md5_range",LongType,true),StructField("event_day",StringType,true)));
val txtrow=txtfile.filter(_.split("\t").size==18).map(
x=>{
var s3_handle=x.split("\t")(5);
var md5_range=Long2long(0);
if(x.split("\t")(5)=="")
{
s3_handle=(new scala.util.Random).nextInt(99).toString(); //取0-99之间的随机数,保证其散列开来
md5_range=s3_handle.toLong;
}
else if(x.split("\t")(5).size==32)
{
md5_range=((s3_handle.toLowerCase.substring(0, 24).toList.map("0123456789abcdef".indexOf(_)).map(BigInt(_)).reduceLeft( _ * 32 + _))%100).toLong
}
else
{
md5_range = Long2long(-1);
}
var user_range=(x.split("\t")(0)).toLong/100000000;
if(user_range>=100)
{
user_range=100;
}
if(md5_range == Long2long(-1))
{
null
}
else {
Row((x.split("\t")(0)).toLong, x.split("\t")(1).toLong, x.split("\t")(2).toLong, x.split("\t")(3), x.split("\t")(4), s3_handle, (x.split("\t")(6)).toLong, x.split("\t")(7), (x.split("\t")(8)).toLong, (x.split("\t")(9)).toLong, (x.split("\t")(10)).toLong, (x.split("\t")(11)).toLong, (x.split("\t")(12)).toLong, (x.split("\t")(13)).toLong, (x.split("\t")(14)).toLong, x.split("\t")(15), (x.split("\t")(16)).toLong, (x.split("\t")(17)).toLong, user_range, md5_range, event_day);
}
}
).filter(_ != null);
val txtdf=sqlContext.createDataFrame(txtrow, schema);
txtdf.repartition(5).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("user_range","md5_range","event_day").parquet(destFile);
}
}
object transtoparquet{
def main(args: Array[String]): Unit = {
if (args.size != 4) {
println("usage: com.yundata.transtoparquet.transtoparquet config srcfile destfile event_day")
return
}
new transtoparquet(args(0),args(1),args(2),args(3)).run()
}
}
使用了这个程序之后,很快,18分钟,就将5G parquet数据全部写入了。
在上述程序中,我使用的配置是:
spark.app.name = TRANSTOPARQUET-JOB
spark.master = spark://sparkmaster:8650
spark.cores.max=50
spark.executor.instances=50
spark.executor.memory=2g
spark.speculation=true
spark.driver.maxResultSize=2g
spark.ui.port=8221
spark.ui.retainedStages=20
spark.ui.retainedJobs=20
spark.sql.parquet.compression.codec=snappy
但是这个程序还有一个问题是,如果repartition不设置的话,最后写入的文件数会非常多,大概是num(user_range)num(md5_range)num(event_day)*num(repartition),很可能会瞬间打爆namenode的内存。因此repartition要设置得非常小,这又导致了整个程序会非常慢。
3)优化一下repartition的方式
原先使用repartiton(5)的方式的时候,是随机分区,导致所有的task都基本可能有每一个分区的数据,所以导致每个分区下面都有5个文件,但是如果我按照需要的分区来作哈希的话,例如repartition(user_range,md5_range,event_day)来的话,那么每个分区的数据只会存在在一个最后写入的task任务中,也就保证了整个任务产生的分区数最大是num(user_range)num(md5_range)num(event_day)
而不是原来的num(repartition)num(user_range)num(md5_range)*num(event_day)
因此我们可以随意启并发数。
将写入代码修改为:
txtdf.repartition(txtdf(“user_range”), txtdf(“md5_range”), txtdf(“event_day”)).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(“user_range”,”md5_range”,”event_day”).parquet(destFile)
这样修改了之后。数据5分钟左右就全部写入了。
9208 4556 3297227427 /horus/users/chenxiue
数据的大小也比之前小了一些,因为文件更加集中了。
下面我们对比一下gzip压缩和snappy压缩的效果:
1)压缩比率
原始文件大小:
1 517 9359700501 hdfs://namenode:8700/pika_data/file_meta_data_20170101_part4/2017011421/1483873213877
snappy压缩产出的文件格式类似:part-r-00000-b3ff5d89-8885-42f1-bb3d-e8dc6fb692a0.snappy.parquet
压缩后的文件大小:
9206 22157 5501724946 /horus/users/chenxiue
大概花了18分钟左右。
gzip压缩是类似:part-r-00003-be2d54a8-088e-4970-be30-363747930a6e.gz.parquet这样的文件
大概也花了18分钟左右
9206 22157 3958666080 /horus/users/chenxiue1
可以看出gzip的压缩比更加大一些。
2)在随机分区得情况下,我们尝试加大最后写入的并发度,看看会不会有加速?
txtdf.repartition(10).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(“user_range”,”md5_range”,”event_day”).parquet(destFile);
结果写入花了19分钟???为啥??
9208 44158 4259882305 /horus/users/chenxiue
发现写入的大小比原先的稍微大一些。
怀疑是数据太小,主要时间花在建立文件上。repartiton越大的话,文件数就越多。