原创-spark中parquet文件写入优化



阅读次数

标签(空格分隔): 性能优化


在我们的实际使用中,经常需要将原始的文本文件转换为parquet列存储格式,以便后续查询的时候使用。写parquet能提高后续表查询效率这个事情我们不多说,下面讨论一下写parquet文件的效率问题:

我们来看一下两段程序:

1.使用case class作为df转换

package com.yundata.transtoparquet

import java.lang.Exception
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row

import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.rdd.PairRDDFunctions

import scala.collection.mutable.ArrayBuffer


import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;


case class formatrow(fs_id:Long,user_id:Long,app_id:Long,parent_path:String,server_filename:String,s3_handle:String,size:Long,server_mtime:String,server_ctime:Long,local_mtime:Long,local_ctime:Long,isdir:Long,isdelete:Long,status:Long,category:Long,object_key:String,extent_int2:Long,recompute_tag:Long,user_range:Long,md5_range:Long,event_day:String) extends Product;


/**
 * Created by robert on 15-5-18.
 */
class transtoparquet(userConfFile: String,sourceFile:String,destFile:String,event_day:String) extends Serializable{



  def run(): Unit = {

    transtoparquetconf(userConfFile);

    val conf=transtoparquetconf.getSparkConf();
    conf.setAppName(conf.get("spark.app.name", this.getClass.getName));

    val sc=new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc);
    import sqlContext.implicits._

    sc.hadoopConfiguration.addResource("hdfs-site.xml");
    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    System.out.println(sc.hadoopConfiguration.get("dfs.ha.namenodes.bigfile"));

    System.out.println(sc.hadoopConfiguration.toString);



    val txtfile = sc.textFile(sourceFile);

    txtfile.take(10).foreach(println);
    println(sourceFile);


    val txtdf=txtfile.filter(_.split("\t").size==18).map(
      x=>{

            var s3_handle=x.split("\t")(5);
            var md5_range=Long2long(0);
            if(x.split("\t")(5)=="")
              {
                s3_handle=(new scala.util.Random).nextInt(99).toString(); //取0-99之间的随机数,保证其散列开来
                md5_range=s3_handle.toLong;
              }
            else if(x.split("\t")(5).size==32)
            {
                md5_range=((s3_handle.toLowerCase.substring(0, 24).toList.map("0123456789abcdef".indexOf(_)).map(BigInt(_)).reduceLeft( _ * 32 + _))%100).toLong
            }
            else
            {
                md5_range = Long2long(-1);
            }
            var user_range=(x.split("\t")(0)).toLong/100000000;


            if(user_range>=100)
            {
                user_range=100;
            }

            if(md5_range == Long2long(-1))
              {
                null
              }


            else {
              formatrow((x.split("\t")(0)).toLong, x.split("\t")(1).toLong, x.split("\t")(2).toLong, x.split("\t")(3), x.split("\t")(4), s3_handle, (x.split("\t")(6)).toLong, x.split("\t")(7), (x.split("\t")(8)).toLong, (x.split("\t")(9)).toLong, (x.split("\t")(10)).toLong, (x.split("\t")(11)).toLong, (x.split("\t")(12)).toLong, (x.split("\t")(13)).toLong, (x.split("\t")(14)).toLong, x.split("\t")(15), (x.split("\t")(16)).toLong, (x.split("\t")(17)).toLong, user_range, md5_range, event_day);
            }
      }
    ).filter(_ != null).toDF();


    txtdf.repartition(5).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("user_range","md5_range","event_day").parquet(destFile);

  }



}



object transtoparquet{

  def main(args: Array[String]): Unit = {
    if (args.size != 4) {
      println("usage: com.yundata.transtoparquet.transtoparquet config srcfile destfile event_day")
      return
    }
    new transtoparquet(args(0),args(1),args(2),args(3)).run()
  }
  }

这个程序实际测试,原始数据9G,压缩后大概是5.5G左右,使用50个核跑了好几个小时,居然都没有写完数据,看executor日志,几B几B地在往hdfs当中去写日志。崩溃,于是,换了一种写法。

2.使用row作为df转换

package com.yundata.transtoparquet


import java.lang.Exception
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row

import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.rdd.PairRDDFunctions

import scala.collection.mutable.ArrayBuffer


import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType};
import org.apache.spark.sql.Row;

/**
 * Created by robert on 15-5-18.
 */
class transtoparquet(userConfFile: String,sourceFile:String,destFile:String,event_day:String) extends Serializable{

  def run(): Unit = {

    transtoparquetconf(userConfFile);

    val conf=transtoparquetconf.getSparkConf();
    conf.setAppName(conf.get("spark.app.name", this.getClass.getName));

    val sc=new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc);
    import sqlContext.implicits._

    sc.hadoopConfiguration.addResource("hdfs-site.xml");
    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    System.out.println(sc.hadoopConfiguration.get("dfs.ha.namenodes.bigfile"));

    System.out.println(sc.hadoopConfiguration.toString);



    val txtfile = sc.textFile(sourceFile);

    txtfile.take(10).foreach(println);
    println(sourceFile);

    val schema=StructType(Array(StructField("fs_id",LongType,true), StructField("user_id",LongType,true),StructField("app_id",LongType,true),StructField("parent_path",StringType,true),StructField("server_filename",StringType,true),StructField("s3_handle",StringType,true),StructField("size",LongType,true),StructField("server_mtime",StringType,true),StructField("server_ctime",LongType,true),StructField("local_mtime",LongType,true),StructField("local_ctime",LongType,true),StructField("isdir",LongType,true),StructField("isdelete",LongType,true),StructField("status",LongType,true),StructField("category",LongType,true),StructField("object_key",StringType,true),StructField("extent_int2",LongType,true),StructField("recompute_tag",LongType,true),StructField("user_range",LongType,true),StructField("md5_range",LongType,true),StructField("event_day",StringType,true)));

    val txtrow=txtfile.filter(_.split("\t").size==18).map(
      x=>{

            var s3_handle=x.split("\t")(5);
            var md5_range=Long2long(0);
            if(x.split("\t")(5)=="")
              {
                s3_handle=(new scala.util.Random).nextInt(99).toString(); //取0-99之间的随机数,保证其散列开来
                md5_range=s3_handle.toLong;
              }
            else if(x.split("\t")(5).size==32)
            {
                md5_range=((s3_handle.toLowerCase.substring(0, 24).toList.map("0123456789abcdef".indexOf(_)).map(BigInt(_)).reduceLeft( _ * 32 + _))%100).toLong
            }
            else
            {
                md5_range = Long2long(-1);
            }
            var user_range=(x.split("\t")(0)).toLong/100000000;


            if(user_range>=100)
            {
                user_range=100;
            }

            if(md5_range == Long2long(-1))
              {
                null
              }
            else {
              Row((x.split("\t")(0)).toLong, x.split("\t")(1).toLong, x.split("\t")(2).toLong, x.split("\t")(3), x.split("\t")(4), s3_handle, (x.split("\t")(6)).toLong, x.split("\t")(7), (x.split("\t")(8)).toLong, (x.split("\t")(9)).toLong, (x.split("\t")(10)).toLong, (x.split("\t")(11)).toLong, (x.split("\t")(12)).toLong, (x.split("\t")(13)).toLong, (x.split("\t")(14)).toLong, x.split("\t")(15), (x.split("\t")(16)).toLong, (x.split("\t")(17)).toLong, user_range, md5_range, event_day);
            }
      }
    ).filter(_ != null);




    val txtdf=sqlContext.createDataFrame(txtrow, schema);

    txtdf.repartition(5).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("user_range","md5_range","event_day").parquet(destFile);
  }
}



object transtoparquet{

  def main(args: Array[String]): Unit = {

    if (args.size != 4) {
      println("usage: com.yundata.transtoparquet.transtoparquet config srcfile destfile event_day")
      return
    }
    new transtoparquet(args(0),args(1),args(2),args(3)).run()
  }

}

使用了这个程序之后,很快,18分钟,就将5G parquet数据全部写入了。

在上述程序中,我使用的配置是:

spark.app.name = TRANSTOPARQUET-JOB
spark.master = spark://sparkmaster:8650
spark.cores.max=50
spark.executor.instances=50
spark.executor.memory=2g
spark.speculation=true
spark.driver.maxResultSize=2g
spark.ui.port=8221
spark.ui.retainedStages=20
spark.ui.retainedJobs=20
spark.sql.parquet.compression.codec=snappy

但是这个程序还有一个问题是,如果repartition不设置的话,最后写入的文件数会非常多,大概是num(user_range)num(md5_range)num(event_day)*num(repartition),很可能会瞬间打爆namenode的内存。因此repartition要设置得非常小,这又导致了整个程序会非常慢。

3)优化一下repartition的方式

原先使用repartiton(5)的方式的时候,是随机分区,导致所有的task都基本可能有每一个分区的数据,所以导致每个分区下面都有5个文件,但是如果我按照需要的分区来作哈希的话,例如repartition(user_range,md5_range,event_day)来的话,那么每个分区的数据只会存在在一个最后写入的task任务中,也就保证了整个任务产生的分区数最大是num(user_range)num(md5_range)num(event_day)

而不是原来的num(repartition)num(user_range)num(md5_range)*num(event_day)

因此我们可以随意启并发数。

将写入代码修改为:

txtdf.repartition(txtdf(“user_range”), txtdf(“md5_range”), txtdf(“event_day”)).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(“user_range”,”md5_range”,”event_day”).parquet(destFile)

这样修改了之后。数据5分钟左右就全部写入了。

9208 4556 3297227427 /horus/users/chenxiue

数据的大小也比之前小了一些,因为文件更加集中了。

下面我们对比一下gzip压缩和snappy压缩的效果:

1)压缩比率

原始文件大小:

1 517 9359700501 hdfs://namenode:8700/pika_data/file_meta_data_20170101_part4/2017011421/1483873213877

snappy压缩产出的文件格式类似:part-r-00000-b3ff5d89-8885-42f1-bb3d-e8dc6fb692a0.snappy.parquet

压缩后的文件大小:
9206 22157 5501724946 /horus/users/chenxiue

大概花了18分钟左右。

gzip压缩是类似:part-r-00003-be2d54a8-088e-4970-be30-363747930a6e.gz.parquet这样的文件

大概也花了18分钟左右
9206 22157 3958666080 /horus/users/chenxiue1

可以看出gzip的压缩比更加大一些。

2)在随机分区得情况下,我们尝试加大最后写入的并发度,看看会不会有加速?

txtdf.repartition(10).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(“user_range”,”md5_range”,”event_day”).parquet(destFile);

结果写入花了19分钟???为啥??
9208 44158 4259882305 /horus/users/chenxiue
发现写入的大小比原先的稍微大一些。

怀疑是数据太小,主要时间花在建立文件上。repartiton越大的话,文件数就越多。