Spark 读取单个文本文件速度很慢解决方法

1.读取单个的大文本文件 速度慢

【分析】

1.通过设置hadoop的参数来解决。需要增加读取文件的并行度,提高读取速度。
对于文本文件,spark认为是一个文件就是一个partiton。
通过分析源码得到设置此参数:mapreduce.input.fileinputformat.split.minsize,增加读取单个大文件的分区数据量。加速读取速度。
spark中设置hadoop配置的方法:spark.sparkContext.hadoopConfiguration。

对于spark中的repartition强行分区,也是在拿到HDFS文件之后对partition进行重分区。改变不了读取文件时的分区数据量。

注意:判断是否可分TextInputFormat中。
protected boolean isSplitable(FileSystem fs, Path file) {CompressionCodec codec = this.compressionCodecs.getCodec(file);return null == codec?true:codec instanceof SplittableCompressionCodec;}
对于压缩文件,要用这个参数io.compression.codecs 指定压缩方式,但是得实现SplittableCompressionCodec接口才行。

2.读取parquet文件的并行速度【对比上面】

 
读取上面的文件:
spark-submit --class com.xx.test.xx \--master yarn-cluster \--queue default \--driver-memory 8g \--num-executors 20 \--executor-memory 12g \--executor-cores 1 \--conf spark.default.parallelism=50 \--conf spark.sql.shuffle.partitions=500 \--conf spark.yarn.executor.memoryOverhead=3306 \--conf spark.executor.extraJavaOptions="-XX:PermSize=512M -XX:MaxPermSize=512m -XX:+UseParallelOldGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \--files $HIVE_HOME/conf/hive-site.xml \--jars /home/mcloud/platform3/spark-csv_2.10-1.4.0.jar,/home/mcloud/platform3/commons-csv-1.1.jar,/home/mcloud/datacenter/jobcenter-job_2.10-1.0.jar,/home/mcloud/platform3/lib-iplocation-0.0.1.jar,/home/mcloud/platform3/common-lib_2.10-1.0.jar,/home/mcloud/platform3/data-api_2.10-1.0.8.jar,/home/mcloud/platform3/JavaEWAH-1.0.2.jar,/home/mcloud/platform3/bitmap-ext_2.10-1.0.jar,$HIVE_HOME/lib/mysql-connector-java-5.1.38.jar,$SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar \/home/mcloud/datacenter_test/jobcenter-job_2.10-1.0-asset2.jar
由上面看 spark.default.parallelism=50 参数生效。

3.textFile源码

textFile调用hadoopFile
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString).setName(path)}
hadoopfile 中创建了HadoopRDD,读取文件的逻辑就在HadoopRDD中。
def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: inputformat="" v="" class="" int="defaultMinPartitions):" rdd="" withscope="" a="" hadoop="" configuration="" can="" be="" about="" kb="" which="" is="" pretty="" big="" so="" broadcast="" it.val="" confbroadcast="broadcast(new" serializableconfiguration="" setinputpathsfunc="(jobConf:" jobconf=""> FileInputFormat.setInputPaths(jobConf, path)new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)}

HadoopRDD的getPartitions的方法就会返回rdd的分区个数。所以逻辑就在此。
override def getPartitions: Array[Partition] = {val jobConf = getJobConf()// add the credentials here as this can be called before SparkContext initializedSparkHadoopUtil.get.addCredentials(jobConf)val inputFormat = getInputFormat(jobConf)//根据inputSplits的创建rdd的分区个数。看下getSplits方法 val inputSplits = inputFormat.getSplits(jobConf, minPartitions)val array = new Array[Partition](inputSplits.size)for (i 
textFile指定了InputFormat为TextInputFormat
看此类中的getSplits方法.
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {Stopwatch sw = (new Stopwatch()).start();FileStatus[] files = this.listStatus(job);job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);long totalSize = 0L;FileStatus[] goalSize = files;int len$ = files.length;for(int minSize = 0; minSize < len$; ++minSize) {FileStatus file = goalSize[minSize];if(file.isDirectory()) {throw new IOException("Not a file: " + file.getPath());}totalSize += file.getLen();}long var30 = totalSize / (long)(numSplits == 0?1:numSplits);long var31 = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);ArrayList splits = new ArrayList(numSplits);NetworkTopology clusterMap = new NetworkTopology();FileStatus[] arr$ = files;int len$1 = files.length;for(int i$ = 0; i$ < len$1; ++i$) {FileStatus file1 = arr$[i$];Path path = file1.getPath();long length = file1.getLen();if(length == 0L) {splits.add(this.makeSplit(path, 0L, length, new String[0]));} else {FileSystem fs = path.getFileSystem(job);BlockLocation[] blkLocations;if(file1 instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus)file1).getBlockLocations();} else {blkLocations = fs.getFileBlockLocations(file1, 0L, length);}if(!this.isSplitable(fs, path)) {String[][] var29 = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);splits.add(this.makeSplit(path, 0L, length, var29[0], var29[1]));} else {long splitHosts = file1.getBlockSize();long splitSize = this.computeSplitSize(var30, var31, splitHosts);long bytesRemaining;String[][] splitHosts1;for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {splitHosts1 = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts1[0], splitHosts1[1]));}if(bytesRemaining != 0L) {splitHosts1 = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts1[0], splitHosts1[1]));}}}}sw.stop();if(LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());}return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);}
/**
*计算分区大小的方法
*/
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {return Math.max(minSize, Math.min(goalSize, blockSize));}


分享到:


相關文章: