Spark 网页式 开发 (三)

上篇我们讲述到了 设计了存储代码的表,这篇讲下 如何动态加载文本代码。

在此之前,先给大家看下 Janino 的使用。

Janino 是一个超小型,超快的Java编译器。



<code>    import scala.collection.JavaConverters._
val ee = CompilerFactoryFactory.getDefaultCompilerFactory().newExpressionEvaluator()
ee.setExpressionType(classOf[Boolean])
ee.setParameters(Array[String]("total", "count"), Array[Class[_]](classOf[Double], classOf[String]))
ee.cook("count.equals(\"3\") || total >= 200.0 ");

val res = ee.evaluate(List(120, "3").asJava.toArray());
println("Result = " + res);/<code>

这是一个非常基本的例子,

运行的代码就是 count.equals("3") || total >= 200.0

首先定义了一个执行器ee

然后定义了返回类型是Boolean,参数的名称 和 参数的类型,

接着设置了表达式 最后设定了 参数的值,就输出结果了,

因为我们设定的total是120,count是字符串3,所以返回的结果自然就是

Result = true

这虽然是一个小小的例子,但是折射出来的意义就是 我们可以做到解析字符串形式的代码,并将它翻译成运行代码去执行。

那么接下来 我们就是开始准备设计 运行这个spark的job的抽象类 job_BaseRunClass。

然后 实现main方法。

<code>def main(args: Array[String]): Unit = {

//这里的params就是外界传入的参数
val params = if (args == null || args.length == 0) Map[String, String]()
else {
args(0).split(";").map(row => row.split("="))
.filter(_.size == 2).map(row => (row(0) -> row(1))).toMap
}

\t//这里判断下,如果运行的类是 com.oasis.spark.job_CodeGenGenerate 才会去执行读取动态代码
\t//反之 依旧维持原状,保证了兼容性
if (params.getOrElse("mainClass", "") == "com.oasis.spark.job_CodeGenGenerate") {

\t\t\t//这里是打开数据库连接,用于获取数据库中存储的代码
val conn = getConn()()
require(params.contains("projectName"))
require(params.contains("jobName"))



val projectName =params.getOrElse("projectName", "")


val pre = conn.prepareStatement(s"select source_code,email_address from code_source where project_name='${projectName}' and job_name='${params.getOrElse("link_jobName", params.get("jobName").get)}' ")
val result = pre.executeQuery()
var sourceCode = ""

if (result.next()) {
//这里我们就拿到了自己存储在数据库中的代码了
sourceCode = result.getString(1)
receivers = if (result.getString(2) == null || result.getString(2).isEmpty) {
""
} else {

result.getString(2).replace("\\n", "").replace("\\r", "")
}


}
result.close()
pre.close()
conn.close()

val jobName = params.getOrElse("jobName", "XX")
\t\t//那么拿到代码之后肯定是去编译这个代码,这个逻辑在getCodeGenObject中。
getCodeGenObject(jobName, sourceCode +
"""
|
|private class JavaUtil {
| public Seq getSeq(T... values) {
| return (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(values));
| }
|}
""".stripMargin,
params.getOrElse("printCode", "false").toBoolean).runJob(
if (params.contains("receivers")) {
println("read config email:" + params.getOrElse("receivers", null))
params
} else {
println("read db email:" + receivers)
params.updated("receivers", receivers)
})

} else {
runJob(params)
}
}

/<code>

接下来 看下getCodeGenObject 这个方法,如何去编译的。

<code>private final def getCodeGenObject(jobName: String, codeBody: String, printCode: Boolean) = {
\t\t//这里定义了类的执行器,敲黑板,重点来了
val evaluator = new ClassBodyEvaluator()

evaluator.setDebuggingInformation(true, true, true);
evaluator.setParentClassLoader(getClass.getClassLoader)
//设置类名
evaluator.setClassName(s"com.oasis.spark.${jobName}")

//设置预先加载的类,相当于在代码里写import 。。。。。
evaluator.setDefaultImports(
Array(
classOf[Platform].getName,
classOf[InternalRow].getName,
classOf[UnsafeRow].getName,
classOf[UTF8String].getName,
classOf[Decimal].getName,
classOf[CalendarInterval].getName,
classOf[ArrayData].getName,
classOf[UnsafeArrayData].getName,
classOf[MapData].getName,
classOf[UnsafeMapData].getName,
classOf[Expression].getName,
classOf[TaskContext].getName,
classOf[TaskKilledException].getName,
classOf[InputMetrics].getName,

"java.io.IOException",


"scala.Tuple2",
"scala.collection.mutable.ListBuffer",
"scala.collection.immutable.Seq",
"scala.collection.JavaConverters",
"scala.collection.JavaConversions",
"scala.collection.JavaConverters$",
"scala.collection.immutable.Map",
"scala.Option",
"scala.collection.immutable.Seq$",
"scala.Predef$",
"scala.reflect.ClassTag$",


"java.util.HashMap",
"java.util.ArrayList",
"java.util.List",
"java.util.Arrays",
"java.util.Calendar",
"java.text.ParseException",

"org.apache.hadoop.fs.Path",
"org.apache.hadoop.fs.FileSystem",

"org.apache.hadoop.conf.Configuration",

"org.apache.spark.sql.Encoders",
"org.apache.spark.sql.SaveMode",
"org.apache.spark.sql.Column",
"org.apache.spark.sql.SparkSession",
"org.apache.spark.sql.Dataset",
"org.apache.spark.sql.Row",
"org.apache.spark.sql.Row$",
"org.apache.spark.sql.expressions.Window",
"org.apache.spark.sql.functions",
"org.apache.spark.sql.types.DataTypes",
"org.apache.spark.sql.api.java.UDF1")
)
\t\t//设置继承的类
evaluator.setExtendedClass(classOf[job_BaseRunClass])
evaluator.cook("generated.java", codeBody)
\t\t//最后实现类 job_BaseRunClass 这个抽象类,相当于 job_BaseRunClass 的子类了
\t\t//将这个类实例化出来,变成了一个对象
evaluator.getClazz().newInstance().asInstanceOf[job_BaseRunClass]
}
/<code>

得到是实例化出来的对象后,调用runJob 开启了spark运行的旅程。

<code>  private final def runJob(params: Map[String, String]) = {
//获取SparkSession 这个对象
val spark = getCommonSpark(params = params)
\t\t//设置一些内部处理
runPre(params)
\t\t//程序处理
execute(params.getOrElse("tableName", params.getOrElse("mainTable", null)), spark)
}
/<code>

这样总体的流程就结束了。

而我们在网页端的代码编写就是两处


Spark 网页式 开发 (三)

实现 runPre 方法 和 initWriteDBEntities 这两个方法就可以了。

下次业务说要修改这个job的逻辑,直接在这个网页上修改,那么下次job运行的时候就会去读取这个最新的代码,然后编译加载了。

完美的实现了 不用重新编译代码、生成jar包的操作了


分享到:


相關文章: