Spark for Java: SparkCore RDD Value类型方法案例

package

com.pactera.base;
import com.pactera.base.bean.User;
import com.pactera.base.utils.BaseUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import
org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import org.json4s.JsonAST;
import scala.Function1;
import scala.collection.TraversableOnce;
import scala.math.Ordering;
import java.util.*;
/**
* SparkCore RDD Value类型
*
*
@author 张锐
*
@create
2020/2/2 14:01
*/
public class BaseApplication {
public static void main(String[] args) {
BaseUtil baseUtil = new BaseUtil();
JavaSparkContext sparkContext = baseUtil.init();
List data = Arrays.asList(1, 2, 3, 4, 5, 6);
JavaRDD javaRDD = sparkContext.parallelize(data);
//1. map(func)案例:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
JavaRDD mapRDD = javaRDD.map(value -> value * 2);
//2. mapPartitions(func) 案例:类似于map,但独立地在RDD的每一个分片上运行

JavaRDD mapPartitions = javaRDD.mapPartitions(iterator -> {
List list = new ArrayList<>();
while (iterator.hasNext()) {
Integer next = iterator.next();
list.add(next);
}
return list.iterator();
});
//jdk<1.8
javaRDD.mapPartitions(new FlatMapFunction, Integer>() {
@Override
public Iterator call(Iterator integerIterator) throws Exception {
List list = new
ArrayList<>();
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
});
//3. mapPartitionsWithIndex(func) 案例:类似于mapPartitions,但func带有一个整数参数表示分片的索引值
javaRDD.mapPartitionsWithIndex((index, integerIterator) -> {
//index表示分区的索引值
List list = new ArrayList<>();
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}, true
);
javaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
@Override
public Iterator call(Integer integer, Iterator integerIterator) throws Exception {
List list = new ArrayList<>();
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
}, true);
//4. flatMap(func) 案例:类似于map,但是每一个输入元素可以被映射为0或多个输出元素

javaRDD.flatMap(value -> Arrays.asList(value).iterator());
JavaRDD flatMap = javaRDD.flatMap(new FlatMapFunction() {
@Override
public Iterator call(Integer integer) throws Exception {
return Arrays.asList(integer).iterator();
}
});
//5. glom案例:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
JavaRDD> glom = javaRDD.glom();
//6. groupBy(func)案例:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
//对象需要实现序列化接口
List users = User.init();
JavaRDD userJavaRDD = sparkContext.parallelize(users);
Map> integerIterableMap = javaRDD.groupBy(value -> value).collectAsMap();

JavaPairRDD> groupBy = userJavaRDD.groupBy(new Function() {
@Override
public Object call(User user) throws Exception {
return user.getAge();
}
});
Map> collectAsMap = groupBy.collectAsMap();
System.out.println(collectAsMap);
//7. filter(func) 案例:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
JavaRDD filter = javaRDD.filter(value -> value > 2);
//8. sample(withReplacement, fraction, seed) 案例:以指定的随机种子随机抽样出数量为fraction的数据,
// withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子

JavaRDD sample = javaRDD.sample(true, 3);
//9. distinct([numTasks])) 案例:对源RDD进行去重后返回一个新的RDD。
// 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它
javaRDD.distinct().collect();
//10. coalesce(numPartitions) 案例:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
javaRDD.coalesce(2);
javaRDD.coalesce(2, true);
//11. repartition(numPartitions) 案例:根据分区数,重新通过网络随机洗牌所有数据。
javaRDD.repartition(2);
//12. coalesce和repartition的区别:
//1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
//2. repartition实际上是调用的coalesce,默认是进行shuffle的

//13. sortBy(func,[ascending], [numTasks]) 案例:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
javaRDD.sortBy(value -> value * 2, true, 2);
//14. pipe(command, [envVars]) 案例:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD。
//注意:脚本需要放在Worker节点可以访问到的位置
javaRDD.pipe("sh demo.sh");
sparkContext.stop();
}
}


分享到:


相關文章: