Spark中讀寫不同類型文件

Spark支持讀取數據格式有文本,json,SequenceFile(MapFile),objectFile,csv等格式。

1.文本文件

文本的讀寫操作:

sc.textFile(dir,1)

rdd.saveAsTextFile(dir)

2.Json

a.json

{"uid":1, "uname":"kyrie", "age":19}

{"uid":2, "uname":"jame", "age":25}

val conf = new SparkConf().setAppName("Jsontest")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val df1 = sqlContext.read.json("a.json")

df1.select("uid","uname","age").show(10,false)

3.sequenceFile

3.1. 讀取SequenceFile

Spark有專門用來讀取SequenceFile的接口。在SparkContext中,可以調用sequenceFile(path, keyClass, valueClass, minpartitions),前面提及SequenceFile使用Writable類,因此keyClass和valueClass都必須使用正確的Writable類。

例:讀取SequenceFile

val data=sc.sequenceFile(inFile,"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

sc.sequenceFile("/datascience/data/data-writer/adt/dmp_click/data/2017/09/26/12/05/0",classOf[Text],classOf[BytesWritable]).map{

case (k, v) =>

val len = v.getLength

val value = new String(v.getBytes, 0, len, "UTF-8")

k.toString -> value

}.take(100).foreach(println)

3.2. 保存SequenceFile

在Scala中,需要創建一個又可以寫出到SequenceFile的類型構成的PairRDD,如果要保存的是Scala的原生類型,可以直接調用saveSequenceFile(path) 。如果鍵和值不能自動轉為Writable類型,或想使用變長類型,可以對數據進行映射操作,在保存之前進行類型轉換。

4.objectFile

* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and* BytesWritable values that contain a serialized partition. This is still an experimental* storage format and may not be supported exactly as is in future Spark releases. It will also* be pretty slow if you use the default serializer (Java serialization),* though the nice thing about it is that there's very little effort required to save arbitrary* objects.
由註釋可知,保存的也是sequenceFile,key為:NullWritable,value:為BytesWritable。
sc.objectFile[ClassTag](path)
* Save this RDD as a SequenceFile of serialized objects.rdd.saveAsObjectFile(path: String)

5.csv

讀取csv壓縮文件或文本文件。

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0" withSources()方式一: val data =sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(s"$dataInput/impression")方式二:importcom.databricks.spark.csv._sqlContext.csvFile(s"$dataInput/impression")option說明:  1、path:解析的CSV文件的目錄,路徑支持通配符;  2、header:默認值是false。我們知道,CSV文件第一行一般是解釋各個列的含義的名稱,如果我們不需要加載這一行,我們可以將這個選項設置為true;  3、delimiter:默認情況下,CSV是使用英文逗號分隔的,如果不是這個分隔,我們就可以設置這個選項。  4、quote:默認情況下的引號是'"',我們可以通過設置這個選項來支持別的引號。  5、mode:解析的模式。默認值是PERMISSIVE,支持的選項有    (1)、PERMISSIVE:嘗試解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.    (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected    (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line 

6.Hadoop輸入輸出格式

新版的Hadoop API讀入文件,newAPIHadoopFile ,寫入saveAsNewAPIHadoopFile。

舊版的Hadoop API讀入文件,HadoopFile ,寫入saveAsHadoopFile

6.1.新接口讀取文件

#文本文件

val rdd = sc.hadoopFile("/user/yu.guan/xueyuan/1005484_1_check.tar.gz", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)

.map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK")

.take(10)

.foreach(println)

#k-v文件

sc.newAPIHadoopFile[Text,BytesWritable,SequenceFileInputFormat[Text,BytesWritable]](path).flatMap{case (_key,value) =>val key = _key.toStringif( filter(key) ) {try{val _bitmap = BitmapUtil.fromWritable(value)Some(key -> _bitmap)}catch{case e : Exception =>println( s"${e.getMessage}")println( s"$key\t$path")None}}else None}

6.2.新接口寫入文件

new UnionRDD(sc,filterBitmaps).reduceByKey(_ or _,partition_num).map{case (k,v) => SerializeText(k.toString).writable -> new BytesWritable(v.toBytes)}.saveAsNewAPIHadoopFile(out , classOf[Text] , classOf[BytesWritable] , classOf[SequenceFileOutputFormat[Text,BytesWritable]])
rdd1.flatMap{case (key,bitmap) =>if(!br_topKeys.value.isEmpty){if(bitmap.cardinality > 0){val _key = if(br_topKeys.value.contains(key)) key else "others"Some(_key -> bitmap)}else None}else Some(key -> bitmap)}.reduceByKey(_ or _).map{case (id,bitmap) => SerializeText(id.toString) -> bitmap.toBytes}.sortByKey(true).map[(Text,BytesWritable)]{case (k,v) => k.writable -> new BytesWritable(v)}.saveAsNewAPIHadoopFile(output,classOf[Text], classOf[BytesWritable], classOf[MapFileOutputFormat])


分享到:


相關文章: