比pgload更快更方便写入大数据量至Greenplum的Spark Connector

前序

Greenplum是目前比较优秀的mpp数据库,其官方推荐了几种将外部数据写入Greenplum方式,包含:通用的Jdbc,pgcopy和pgload以及Pivotal Greenplum-Spark Connector等。

  • Jdbc:Jdbc方式,写大数据量会很慢。
  • pgcopy:其中pgcopy是及其不推荐的一种,因为其写数据必须经过Greenplum的master,因此也只建议小数据量使用。
  • pgload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考pgload。
  • Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。

1. Greenplum-Spark Connector是啥

Greenplum-Spark Connector(GSC)是Pivotal提供的一个高性能并行的读写Greenplum的工具。不需要去安装麻烦的Greenplum loader客户端,也不用去实现繁琐的copy代码。

2. Greenplum-Spark Connector读数据架构

一个Spark application,是由Driver和Executor节点构成。当Spark application使用Greenplum-Spark Connector加载Greenplum数据时,其Driver端会通过JDBC的方式请求Greenplum的master节点获取相关的元数据信息。Connector将会根据这些元数据信息去决定Spark的Executor去怎样去并行的读取该表的数据。

Greenplum数据库存储数据是按segment组织的,Greenplum-Spark Connector在加载Greenplum数据时,需要指定Greenplum表的一个字段作为Spark的partition字段,Connector会使用这个字段的值来计算,该Greenplum表的某个segment该被哪一个或多个Spark partition读取。

其读取过程如下:

  1. Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
  2. Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
  3. 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。

其示意流程图如下:

比pgload更快更方便写入大数据量至Greenplum的Spark Connector

3. Greenplum-Spark Connector写数据流程

  1. GSC在Spark Executor端通过Jetty启动一个Http服务,将该服务封装为支持Greenplum的gpfdist协议。
  2. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,创建Greenplum外部表,该外部表文件地址指向该Executor所启动的gpfdist协议地址。SQL示例如下:
<code>CREATE READABLE EXTERNAL TABLE"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')FORMAT 'CSV'(DELIMITER AS '|' NULL AS '')ENCODING 'UTF-8'/<code>
  1. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,然后执行insert语句至真实的表中,数据来源于这张外部表。SQL示例如下:
<code>INSERT INTO "public"."rank_a1"SELECT *FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"/<code>

至于这张外部表的数据,是否落地当前Executor服务器,不清楚。猜测不会落地,而是直接通过Http直接传递给了Greenplum对应的Segment。

  1. GSC监听onApplicationEnd事件,在Spark application结束后,删除创建的外部表。

4. Greenplum-Spark Connector使用

  1. 下载GSC Jar包。 下载地址:Pivotal Network。 可直接下载最新版本的GSC即1.6.2,支持Greenplum5.0之后的版本。greenplum-spark_-.jar,如:
<code>greenplum-spark_2.11-1.6.2.jar/<code>
  1. maven中引入:
<code>        <dependency>            
<groupid>io.pivotal.greenplum.spark/<groupid>
\t\t\t\t\t<artifactid>greenplum-spark_2.11/<artifactid>
\t\t\t\t\t<version>1.6.2/<version>
\t\t\t\t/<dependency>/<code>
  1. spark提交引入:
  • spark-shell或spark-submit时候,通过--jars加入greenplum-spark_2.11-1.6.2.jar。
  • 将greenplum-spark_2.11-1.6.2.jar与Spark application包打成 uber jar 提交。

5. Greenplum-Spark Connector参数

参数名:url

参数描述:Jdbc连接的url。

作用域:读,写


参数名:dbschema

参数描述:Greenplum数据库的schema,GSC创建的临时外部表也在该schema下,默认值为public。

作用域:读,写


参数名:dbtable

参数描述:Greenplum数据库的表名,GSC在读取时,会读取dbschema下的表。GSC在写数据时,如果该表不存在会自动创建。

作用域:读,写


参数名:driver

参数描述:Jdbc driver全类名,非必填,在GSC Jar包中已经包含了driver包。

作用域:读,写


参数名:user

参数描述:用户名

作用域:读,写


参数名:password

参数描述:密码

作用域:读,写


参数名:partitionColumn

参数描述:Greenplum数据表的字段,该字段将作为Spark分区的字段,支持integer, bigint, serial, bigserial4中类型,该字段名需小写。该字段为必填,且必须是Greenplum表建表时 DISTRIBUTED BY ()语句中的字段。

作用域:读


参数名:partitions

参数描述:Spark分区数,非必填,其默认值为Greenplum的primary segments数量。

作用域:读


参数名:truncate

参数描述:当在Spark中指定了输出模式为SaveMode.Overwrite时候,写的目标表存在的时候的策略,非必填。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。

作用域:写


参数名:iteratorOptimization

参数描述:指定写数据时内存模式,非必填。默认指为true,GSC将会使用 Iterator 方式。当为false时,GSC将会在写数据时将数据存储在内存中。

作用域:写


参数名:server.port

参数描述:指定在Spark Worker端启动gpfdist服务的端口号,非必填。默认情况下会使用随机的端口号。

作用域:读,写


参数名:server.useHostname

参数描述:指定是否使用Spark Worker节点的host name为gpfdis服务的地址,非必填。默认为false。

作用域:读,写


参数名:pool.maxSize

参数描述:GSC连接Greenplum的连接池的最大连接数,默认为64。

作用域:读,写


参数名:pool.timeoutMs

参数描述:非活动连接被认为是空闲连接的时间,毫秒值。默认为10000(10秒)。

作用域:读,写


参数名:pool.minIdle

参数描述:GSC连接Greenplum的连接池的最小空闲连接数,默认为0。

作用域:读,写


6. 从Greenplum读取数据

  1. DataFrameReader.load()方式:
<code>val gscReadOptionMap = Map(      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",      "user" -> "bill",      "password" -> "changeme",      "dbschema" -> "myschema",      "dbtable" -> "table1",      "partitionColumn" -> "id")val gpdf = spark.read.format("greenplum")      .options(gscReadOptionMap)      .load()/<code>
  1. spark.read.greenplum()方式:
<code>val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"val tblname = "avgdelay"val jprops = new Properties()jprops.put("user", "user2")jprops.put("password", "changeme")jprops.put("partitionColumn", "airlineid")val gpdf = spark.read.greenplum(url, tblname, jprops)/<code>

然鹅,这种方式必然需要引入一个隐式转换,官网也没介绍。

7. 写数据至Greenplum

7.1. 写数据示例:

<code>val gscWriteOptionMap = Map("url" -> "jdbc:postgresql://gpdb-master:5432/testdb",      "user" -> "bill",      "password" -> "changeme",      "dbschema" -> "myschema",      "dbtable" -> "table2",)dfToWrite.write.format("greenplum")      .options(gscWriteOptionMap)      .save()/<code>

在通过GSC写到Greenplum表时,如果表已经存在或表中已经存在数据,可通过DataFrameWriter.mode(SaveMode savemode)方式指定其输出模式。相关模式行为如下:

SaveMode:ErrorIfExists

行为:如果Greenplum数据表已经存在则GSC直接返回错误,该策略为默认策略。


SaveMode:Append

行为:直接将Spark中数据追加至表中。


SaveMode:Ignore

行为:如果Greenplum数据表已经存在,GSC将不会写数据至表中也不会去修改已经存在的数据。


SaveMode:Overwrite

行为:如果Greenplum数据表已经存在,则truncate参数将会生效。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。

7.2. GSC自动建表:

  1. 创建的Greenplum表将不会有distribution列,如下为GSC生成的建表语句:
<code>CREATE TABLE "public"."rank_a1" ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);/<code>
  1. 创建的Greenplum表的字段名将会使用Spark DataFrame中的字段名。
  2. 在GSC自动建表时,将会为字段名加上双引号,这将使Greenplum区分大小写。
  3. 当Spark DataFrame的字段不为nullable时,GSC自动建表的字段将是 NOT NULL。
  4. 将会对应的Spark DataFrame字段类型映射为Greenplum的字段类型。参考,字段类型映射表。

7.3. 提前手动建表:

  1. 将Spark DataFrame的字段名的数据写至Greenplum表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
  2. 写至Greenplum的字段的数据类型,与对应的Spark DataFrame一致,具体参见字段类型映射。
  3. 如果Spark数据中某列包含空数据,需确保对应的Greenplum表的列没有被指定为NOT NULL。
  4. Greenplum表中建表时其字段顺序可以与Spark DataFrame中不一致。但Greenplum表中不能出现不存在在Spark DataFrame中的字段。如下例子:
<code>// Greenplum 中的字段CREATE TABLE public.rank_a1 (    id int4 NOT NULL,    "rank" text NULL,    "year" int4 NOT NULL,    gender int4 NOT NULL,    count int4 NOT NULL)DISTRIBUTED BY (id);// Spark DataFrame中的字段var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")// 在写数据至public.rank_a1表时,将会报错如下Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.Old column names (5): _1, _2, _3, _4, _5New column names (4): id, rank, year, gender    at scala.Predef$.require(Predef.scala:224)    at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)    at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)    at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)    at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)/<code>
  1. 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。

8. Troubleshooting

8.1. 端口相关问题

原因解决办法

错误信息:java.lang.RuntimeException: is not a valid port number.

解决办法:通过server.port所指定的端口无效,比如1024以内,为系统使用端口指定端口在[1024-65535]之间


错误信息:java.lang.RuntimeException:Unable to start GpfdistService on any of ports=

解决办法:通过server.port指定的端口已经被占用从新指定一个未被占用的端口,或不指定该参数

8.2. Greenplum连接数问题

当连接Greenplum的连接数接近Greenplum数据库配置的最大连接数(max_connections)时。Spark application将会抛出 connection limit exceeded 错误。

排查过程:

  1. 查询Greenplum数据的最大连接数:
<code>postgres=# show max_connections; max_connections----------------- 250(1 row)/<code>
  1. 查询当前连接Greenplum数据库的连接数:
<code>postgres=# SELECT count(*) FROM pg_stat_activity;/<code>
  1. 查询指定的用户连接Greenplum数据的连接数:
<code>postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';/<code>
  1. 查询Greenplum数据库空闲和活动的连接数:
<code>postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<idle>';postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<idle>';/<idle>/<idle>/<code>
  1. 查询连接Greenplum数据库名,用户名,客户端地址,客户端ip,当前查询语句:
<code>postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;/<code>

如果确认是Spark application使用连接数过多,则配置JDBC Connection Pooling相关参数,减少连接数。

8.3. Greenplum Database Data Length Errors

在使用Greenplum 4.x或5.x的时候,可能会报出“data line too long”错误。这是因为在Greenplum数据库中参数项“gpmaxcsvlinelength”默认值是1M。需要登陆Greenplum master修改这个参数值,示例如下,通过gpconfig修改该参数的值为5M:

<code>gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880gpadmin@gpmaster$ gpstop -u/<code>

9. 参考

  1. Greenplum-Spark Connector官方文档
  2. Greenplum建表语句文档
  3. Greenplum参数配置官方文档


分享到:


相關文章: