如何在 Kylin 中优雅地使用 Spark

前言

Kylin 用户在使用 Spark的过程中,经常会遇到任务提交缓慢、构建节点不稳定的问题。为了更方便地向 Spark 提交、管理和监控任务,有些用户会使用 Livy 作为 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了通过 Apache Livy 递交 Spark 任务的新功能[KYLIN-3795],特此感谢滴滴靳国卫同学对此功能的贡献。

Livy 介绍

Apache Livy 是一个基于 Spark 的开源 REST 服务,是 Apache 基金会的一个孵化项目,它能够通过 REST 的方式将代码片段或是序列化的二进制代码提交到 Spark 集群中去执行。它提供了如下基本功能:

  • 提交 Scala、Python 或是 R 代码片段到远端的 Spark 集群上执行。
  • 提交 Java、Scala、Python 所编写的 Spark 作业到远端的 Spark 集群上执行。
如何在 Kylin 中优雅地使用 Spark

△ Apache Livy 架构

为什么使用 Livy

1. 当前 Spark 存在的问题

Spark 当前支持两种交互方式:

  • 交互式处理用户使用 spark-shell 或 pyspark 脚本启动 Spark 应用程序,伴随应用程序启动的同时,Spark 会在当前终端启动 REPL(Read–Eval–Print Loop) 来接收用户的代码输入,并将其编译成 Spark 作业。
  • 批处理批处理的程序逻辑由用户实现并编译打包成 jar 包,spark-submit 脚本启动 Spark 应用程序来执行用户所编写的逻辑,与交互式处理不同的是批处理程序在执行过程中用户没有与 Spark 进行任何的交互。

两种方式都需要用户登录到 Gateway 节点上通过脚本启动 Spark 进程,但是会出现以下问题:

  • 增加 Gateway 节点的资源使用负担和故障发生的可能性。
  • 同时 Gateway 节点的故障会带来单点问题,造成 Spark 程序的失败。
  • 难以管理、审计以及与已有的权限管理工具的集成。由于 Spark 采用脚本的方式启动应用程序,因此相比于 WEB 方式少了许多管理、审计的便利性,同时也难以与已有的工具结合,如 Apache Knox 等。
  • 将 Gateway 节点上的部署细节以及配置不可避免地暴露给了登陆用户。

2. Livy 优势

一方面,接受并解析用户的 REST 请求,转换成相应的操作;另一方面,它管理着用户所启动的所有的 Spark 集群。

Livy 具有如下功能:

  • 通过 Livy session 实时提交代码片段与 Spark 的 REPL 进行交互。
  • 通过 Livy batch 提交 Scala、Java、Python 编写的二进制包来提交批处理任务。
  • 多用户能够使用同一个服务器(支持用户模拟)。
  • 能够通过 REST 接口在任何设备上提交任务、查看任务执行状态和结果。

Kylin with Livy

1. 引入 Livy 之前 Kylin 是如何使用 Spark 的

Spark 是在 Kylin v2.0 引入的,主要应用于 Cube 构建,构建过程介绍可以查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

下面是 SparkExecutable 类的 doWork 方法关于提交 Spark job 的一段代码,我们可以看到 Kylin 会从配置中获取 Spark job 包的路径(默认为 $KYLIN_HOME/lib),通过本地指令的形式提交 Spark job,然后循环获取 Spark job 的执行状态和结果。我们可以看到 Kylin 单独开了一个线程在本地向 Spark 客户端发送来 job 请求并且循环获取结果,额外增加了节点系统压力。

@Override

protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {

//略...

String jobJar = config.getKylinJobJarPath(); //获取job jar的路径

//略...

final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //构建本地command

//略...

//创建指令执行线程

Callable callable = new Callable>() {

@Override

public Pair call() throws Exception {

Pair result;

try {

result = exec.execute(cmd, patternedLogger);

} catch (Exception e) {

logger.error("error run spark job:", e);

result = new Pair<>(-1, e.getMessage());

}

return result;

}

};

//略...

try {

Future> future = executorService.submit(callable);

Pair result = null;

while (!isDiscarded() && !isPaused()) {

if (future.isDone()) {

result = future.get(); //循环获取指令执行结果

break;

} else {

Thread.sleep(5000); //每隔5秒检查一次job执行状态

}

}

//略...

} catch (Exception e) {

logger.error("Error run spark job:", e);

return ExecuteResult.createError(e);

}

//略...

}

2. Livy for Kylin 详细解析

Livy 向 Spark 提交 job 一共有两种,分别是 Session 和 Batch,Kylin 是通过 Batch 的方式提交 job 的,需要提前构建好 Spark job 对应的 jar 包并上传到 HDFS 中,并且将配置项 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。

Batch 一共具有如下九种状态:

public enum LivyStateEnum { starting, running, success, dead, error, not_started, idle, busy, shutting_down;}下面是 SparkExecutableLivy 类的 doWork 方法和 LivyRestExecutor 类的 execute 方法关于提交 Spark job 的一段代码,Kylin 通过 livyRestBuilder 读取配置文件获取 Spark job 的包路径,然后通过 restClient 向 Livy 发送 Http 请求。在提交 job 之后会每隔 10 秒查询一次 job 执行的结果,直到 job 的状态变为 shutting_down, error, dead, success 中的一种。每一次都是通过 Http 的方式发送请求,相比较于通过本地 Spark 客户端提交任务,更加稳定而且减少了 Kylin 节点系统压力。

下面是 SparkExecutableLivy 类的 doWork 方法和 LivyRestExecutor 类的 execute 方法关于提交 Spark job 的一段代码,Kylin 通过 livyRestBuilder 读取配置文件获取 Spark job 的包路径,然后通过 restClient 向 Livy 发送 Http 请求。在提交 job 之后会每隔 10 秒查询一次 job 执行的结果,直到 job 的状态变为 shutting_down, error, dead, success 中的一种。每一次都是通过 Http 的方式发送请求,相比较于通过本地 Spark 客户端提交任务,更加稳定而且减少了 Kylin 节点系统压力。

@Override

protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {

//略...

livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);

executor.execute(livyRestBuilder, patternedLogger); //调用LivyRestExecutor类的execute方法

if (isDiscarded()) {

return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");

}

if (isPaused()) {

return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");

}

//略...

}

public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {

LivyRestClient restClient = new LivyRestClient();

String result = restClient.livySubmitJobBatches(dataJson); //向Livy发送http请求

JSONObject resultJson = new JSONObject(result);

String state = resultJson.getString("state"); //得到Livy请求结果

final String livyTaskId = resultJson.getString("id");

while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.error.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.dead.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {

String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //获取Spark job执行状态

JSONObject stateJson = new JSONObject(statusResult);

if (!state.equalsIgnoreCase(stateJson.getString("state"))) {

logAppender.log("Livy status Result: " + stateJson.getString("state"));

}

state = stateJson.getString("state");

Thread.sleep(10*1000); //每10秒检查一次结果

}

}

3. Livy 在 Kylin 中的应用

构建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 原本都是通过 Hive 客户端(Cli 或 Beeline)进行构建的,引入 Livy 之后,Kylin 通过 Livy 来调用 SparkSQL 进行构建,提高了平表的构建速度。在引入 Livy 之后,Cube 的构建主要改变的是以下几个步骤,对应的任务日志输出如下:

  • 构建 Intermediate Flat Hive Table
如何在 Kylin 中优雅地使用 Spark

△ 点击图片查看大图

  • 构建 Redistribute Flat Hive Table
如何在 Kylin 中优雅地使用 Spark

△ 点击图片查看大图

  • 使用 Spark-Submit 的地方都用 Livy 的 Batch API 进行替换

1)构建 Cube

如何在 Kylin 中优雅地使用 Spark

△ 点击图片查看大图

2)转换 Cuboid 为 HFile

如何在 Kylin 中优雅地使用 Spark

△ 点击图片查看大图

4. 引入 Livy 对 Kylin 的好处

  • 无需准备 Spark 的客户端配置,Kylin 部署更加轻量化。
  • Kylin 节点系统压力更低,无需在 Kylin 节点启动 Spark 客户端。
  • 构建 Flat Hive Table 更快,通过 Livy 可以使用 Spark SQL 构建平表,而 Spark SQL 要快于 Hive。
  • 提交 job 更快,job 状态获取更方便。

5. 如何在 Kylin 中启用 Livy

在 Kylin 启用 Livy 前,请先确保 Livy 能够正常工作

1)在 Kylin.properties 中,加入如下配置,并重启使之生效。

其中 livy-key.file 和 livy-arr.jars 地址之间不要有空格,否则可能会出不可预知的错误。

2)Cube 构建引擎选用 Spark。

常见问题

以下问题往往为使用不当和配置错误的原因,非 Kylin 本身存在的问题,此处仅为友情提示。

1. Table or view not found

输出日志:

解决方法:

2. livy request 400 error

解决方法:

3. NoClassDefFoundError

输出日志:

解决方法:

4. livy sql 执行错误

解决方法:

总结

Livy 本质上是在 Spark 上的 REST 服务,对于 Kylin cube 的构建没有本质上的性能提升,但是通过引入 Livy,Kylin 能够直接通过 Spark SQL 代替 Hive 构建 Flat Table,而且管理 Spark job 也更加方便。但是,Livy 当前也存在一些问题,比如使用较低或较高版本的 Spark 无法正常工作以及单点故障等问题,用户可以考虑自身的实际场景选择是否需要在 Kylin 中使用 Livy。

参考文章

  1. https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/
  2. https://wiki.apache.org/incubator/LivyProposal
  3. https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/


分享到:


相關文章: