1.讀取單個的大文本文件 速度慢
【分析】
1.通過設置hadoop的參數來解決。需要增加讀取文件的並行度,提高讀取速度。
對於文本文件,spark認為是一個文件就是一個partiton。
通過分析源碼得到設置此參數:mapreduce.input.fileinputformat.split.minsize,增加讀取單個大文件的分區數據量。加速讀取速度。
spark中設置hadoop配置的方法:spark.sparkContext.hadoopConfiguration。
對於spark中的repartition強行分區,也是在拿到HDFS文件之後對partition進行重分區。改變不了讀取文件時的分區數據量。
注意:判斷是否可分TextInputFormat中。
protected boolean isSplitable(FileSystem fs, Path file) {CompressionCodec codec = this.compressionCodecs.getCodec(file);return null == codec?true:codec instanceof SplittableCompressionCodec;}
對於壓縮文件,要用這個參數io.compression.codecs 指定壓縮方式,但是得實現SplittableCompressionCodec接口才行。
2.讀取parquet文件的並行速度【對比上面】
讀取上面的文件:
spark-submit --class com.xx.test.xx \--master yarn-cluster \--queue default \--driver-memory 8g \--num-executors 20 \--executor-memory 12g \--executor-cores 1 \--conf spark.default.parallelism=50 \--conf spark.sql.shuffle.partitions=500 \--conf spark.yarn.executor.memoryOverhead=3306 \--conf spark.executor.extraJavaOptions="-XX:PermSize=512M -XX:MaxPermSize=512m -XX:+UseParallelOldGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \--files $HIVE_HOME/conf/hive-site.xml \--jars /home/mcloud/platform3/spark-csv_2.10-1.4.0.jar,/home/mcloud/platform3/commons-csv-1.1.jar,/home/mcloud/datacenter/jobcenter-job_2.10-1.0.jar,/home/mcloud/platform3/lib-iplocation-0.0.1.jar,/home/mcloud/platform3/common-lib_2.10-1.0.jar,/home/mcloud/platform3/data-api_2.10-1.0.8.jar,/home/mcloud/platform3/JavaEWAH-1.0.2.jar,/home/mcloud/platform3/bitmap-ext_2.10-1.0.jar,$HIVE_HOME/lib/mysql-connector-java-5.1.38.jar,$SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar \/home/mcloud/datacenter_test/jobcenter-job_2.10-1.0-asset2.jar
由上面看 spark.default.parallelism=50 參數生效。
3.textFile源碼
textFile調用hadoopFile
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString).setName(path)}
hadoopfile 中創建了HadoopRDD,讀取文件的邏輯就在HadoopRDD中。
def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: inputformat="" v="" class="" int="defaultMinPartitions):" rdd="" withscope="" a="" hadoop="" configuration="" can="" be="" about="" kb="" which="" is="" pretty="" big="" so="" broadcast="" it.val="" confbroadcast="broadcast(new" serializableconfiguration="" setinputpathsfunc="(jobConf:" jobconf=""> FileInputFormat.setInputPaths(jobConf, path)new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)}
HadoopRDD的getPartitions的方法就會返回rdd的分區個數。所以邏輯就在此。
override def getPartitions: Array[Partition] = {val jobConf = getJobConf()// add the credentials here as this can be called before SparkContext initializedSparkHadoopUtil.get.addCredentials(jobConf)val inputFormat = getInputFormat(jobConf)//根據inputSplits的創建rdd的分區個數。看下getSplits方法 val inputSplits = inputFormat.getSplits(jobConf, minPartitions)val array = new Array[Partition](inputSplits.size)for (itextFile指定了InputFormat為TextInputFormat看此類中的getSplits方法.public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {Stopwatch sw = (new Stopwatch()).start();FileStatus[] files = this.listStatus(job);job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);long totalSize = 0L;FileStatus[] goalSize = files;int len$ = files.length;for(int minSize = 0; minSize < len$; ++minSize) {FileStatus file = goalSize[minSize];if(file.isDirectory()) {throw new IOException("Not a file: " + file.getPath());}totalSize += file.getLen();}long var30 = totalSize / (long)(numSplits == 0?1:numSplits);long var31 = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);ArrayList splits = new ArrayList(numSplits);NetworkTopology clusterMap = new NetworkTopology();FileStatus[] arr$ = files;int len$1 = files.length;for(int i$ = 0; i$ < len$1; ++i$) {FileStatus file1 = arr$[i$];Path path = file1.getPath();long length = file1.getLen();if(length == 0L) {splits.add(this.makeSplit(path, 0L, length, new String[0]));} else {FileSystem fs = path.getFileSystem(job);BlockLocation[] blkLocations;if(file1 instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus)file1).getBlockLocations();} else {blkLocations = fs.getFileBlockLocations(file1, 0L, length);}if(!this.isSplitable(fs, path)) {String[][] var29 = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);splits.add(this.makeSplit(path, 0L, length, var29[0], var29[1]));} else {long splitHosts = file1.getBlockSize();long splitSize = this.computeSplitSize(var30, var31, splitHosts);long bytesRemaining;String[][] splitHosts1;for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {splitHosts1 = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts1[0], splitHosts1[1]));}if(bytesRemaining != 0L) {splitHosts1 = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts1[0], splitHosts1[1]));}}}}sw.stop();if(LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());}return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);}/***計算分區大小的方法*/protected long computeSplitSize(long goalSize, long minSize, long blockSize) {return Math.max(minSize, Math.min(goalSize, blockSize));}
閱讀更多 從大數據說起 的文章