package com.pactera.base;
import com.pactera.base.bean.User;
import com.pactera.base.utils.BaseUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import org.json4s.JsonAST;
import scala.Function1;
import scala.collection.TraversableOnce;
import scala.math.Ordering;
import java.util.*;
/**
* SparkCore RDD Value类型
*
* @author 张锐
* @create 2020/2/2 14:01
*/
public class BaseApplication {
public static void main(String[] args) {
BaseUtil baseUtil = new BaseUtil();
JavaSparkContext sparkContext = baseUtil.init();
List
JavaRDD
//1. map(func)案例:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
JavaRDD
//2. mapPartitions(func) 案例:类似于map,但独立地在RDD的每一个分片上运行
JavaRDD
List
while (iterator.hasNext()) {
Integer next = iterator.next();
list.add(next);
}
return list.iterator();
});
//jdk<1.8
javaRDD.mapPartitions(new FlatMapFunction
@Override
public Iterator
List
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
});
//3. mapPartitionsWithIndex(func) 案例:类似于mapPartitions,但func带有一个整数参数表示分片的索引值
javaRDD.mapPartitionsWithIndex((index, integerIterator) -> {
//index表示分区的索引值
List
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}, true );
javaRDD.mapPartitionsWithIndex(new Function2
@Override
public Iterator
List
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
}, true);
//4. flatMap(func) 案例:类似于map,但是每一个输入元素可以被映射为0或多个输出元素
javaRDD.flatMap(value -> Arrays.asList(value).iterator());
JavaRDD
@Override
public Iterator
return Arrays.asList(integer).iterator();
}
});
//5. glom案例:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
JavaRDD> glom = javaRDD.glom();
//6. groupBy(func)案例:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
//对象需要实现序列化接口
List
JavaRDD
Map
JavaPairRDD
閱讀更多 張銳 的文章