Spark for Java: SparkCore RDD Value类型方法案例

package /<font>com.pactera.base;

import /<font>com.pactera.base.bean.User;
import /<font>com.pactera.base.utils.BaseUtil;
import /<font>org.apache.spark.SparkConf;
import /<font>org.apache.spark.SparkContext;
import /<font>org.apache.spark.api.java.JavaPairRDD;
import /<font>org.apache.spark.api.java.JavaRDD;
import /<font>org.apache.spark.api.java.JavaSparkContext;
import /<font>org.apache.spark.api.java.function.DoubleFlatMapFunction;
import /<font>org.apache.spark.api.java.function.FlatMapFunction;
import /<font>org.apache.spark.api.java.function.Function;
import /<font>org.apache.spark.api.java.function.Function2;
import /<font>org.apache.spark.rdd.RDD;
import /<font>org.json4s.JsonAST;
import /<font>scala.Function1;
import /<font>scala.collection.TraversableOnce;
import /<font>scala.math.Ordering;
import /<font>java.util.*;

/**
* SparkCore RDD Value类型
*
* /<font>@author /<font>张锐
* /<font>@create /<font>2020/2/2 14:01
*/
/<font>public class /<font>BaseApplication {
public static void /<font>main(String[] args) {
BaseUtil baseUtil = new /<font>BaseUtil();
JavaSparkContext sparkContext = baseUtil.init();
List<integer> data = Arrays.asList(1/<font>, 2/<font>, 3/<font>, 4/<font>, 5/<font>, 6/<font>);
JavaRDD<integer> javaRDD = sparkContext.parallelize(data);
//1. map(func)案例:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
/<font>JavaRDD<integer> mapRDD = javaRDD.map(value -> value * 2/<font>);
//2. mapPartitions(func) 案例:类似于map,但独立地在RDD的每一个分片上运行
/<font>JavaRDD<integer> mapPartitions = javaRDD.mapPartitions(iterator -> {
List<integer> list = new /<font>ArrayList<>();
while /<font>(iterator.hasNext()) {
Integer next = iterator.next();
list.add(next);
}
return /<font>list.iterator();
});
//jdk<1.8
/<font>javaRDD.mapPartitions(new /<font>FlatMapFunction<iterator>, Integer>() {
@Override
/<font>
public /<font>Iterator<integer> call(Iterator<integer> integerIterator) throws /<font>Exception {
List<integer> list = new /<font>ArrayList<>();
while /<font>(integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return /<font>list.iterator();
}
});
//3. mapPartitionsWithIndex(func) 案例:类似于mapPartitions,但func带有一个整数参数表示分片的索引值
/<font>javaRDD.mapPartitionsWithIndex((index, integerIterator) -> {
//index表示分区的索引值
/<font>List<integer> list = new /<font>ArrayList<>();
while /<font>(integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return /<font>list.iterator();
}, true/<font>);
javaRDD.mapPartitionsWithIndex(new /<font>Function2<integer>, Iterator<integer>>() {
@Override
/<font>public /<font>Iterator<integer> call(Integer integer, Iterator<integer> integerIterator) throws /<font>Exception {
List<integer> list = new /<font>
ArrayList<>();
while /<font>(integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return /<font>list.iterator();
}
}, true/<font>);
//4. flatMap(func) 案例:类似于map,但是每一个输入元素可以被映射为0或多个输出元素
/<font>javaRDD.flatMap(value -> Arrays.asList(value).iterator());
JavaRDD<integer> flatMap = javaRDD.flatMap(new /<font>FlatMapFunction<integer>() {
@Override
/<font>public /<font>Iterator<integer> call(Integer integer) throws /<font>Exception {
return /<font>Arrays.asList(integer).iterator();
}
});
//5. glom案例:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
/<font>JavaRDD<list>> glom = javaRDD.glom();
//6. groupBy(func)案例:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
//对象需要实现序列化接口
/<font>List<user> users = User.init();
JavaRDD<user> userJavaRDD = sparkContext.parallelize(users);
Map<integer>> integerIterableMap = javaRDD.groupBy(value -> value).collectAsMap();
JavaPairRDD<object>> groupBy = userJavaRDD.groupBy(new /<font>Function<user>() {
@Override

/<font>public /<font>Object call(User user) throws /<font>Exception {
return /<font>user.getAge();
}
});
Map<object>> collectAsMap = groupBy.collectAsMap();
System.out/<font>.println(collectAsMap);
//7. filter(func) 案例:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
/<font>JavaRDD<integer> filter = javaRDD.filter(value -> value > 2/<font>);
//8. sample(withReplacement, fraction, seed) 案例:以指定的随机种子随机抽样出数量为fraction的数据,
// withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子
/<font>JavaRDD<integer> sample = javaRDD.sample(true/<font>, 3/<font>);
//9. distinct([numTasks])) 案例:对源RDD进行去重后返回一个新的RDD。
// 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它
/<font>javaRDD.distinct().collect();
//10. coalesce(numPartitions) 案例:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
/<font>javaRDD.coalesce(2/<font>);
javaRDD.coalesce(2/<font>, true/<font>);
//11. repartition(numPartitions) 案例:根据分区数,重新通过网络随机洗牌所有数据。
/<font>javaRDD.repartition(2/<font>);

//12. coalesce和repartition的区别:
//1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
//2. repartition实际上是调用的coalesce,默认是进行shuffle的
//13. sortBy(func,[ascending], [numTasks]) 案例:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
/<font>javaRDD.sortBy(value -> value * 2/<font>, true/<font>, 2/<font>);
//14. pipe(command, [envVars]) 案例:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD。
//注意:脚本需要放在Worker节点可以访问到的位置
/<font>javaRDD.pipe("sh demo.sh"/<font>);
sparkContext.stop();
}
}/<integer>/<integer>/<object>/<user>/<object>/<integer>/<user>/<user>/<list>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<iterator>/<integer>/<integer>/<integer>/<integer>/<integer>