簡化ETL工作,編寫一個Canal膠水層

整篇文章可能會有點流水賬,相對詳細地介紹怎麼寫一個小型的"框架"。這個精悍的膠水層已經在生產環境服役超過半年,這裡嘗試把耦合業務的代碼去掉,提煉出一個相對簡潔的版本。

之前寫的幾篇文章裡面其中一篇曾經提到過Canal解析MySQL的binlog事件後的對象如下(來源於Canal源碼
com.alibaba.otter.canal.protocol.FlatMessage):

簡化ETL工作,編寫一個Canal膠水層

如果直接對此原始對象進行解析,那麼會出現很多解析模板代碼,一旦有改動就會牽一髮動全身,這是我們不希望發生的一件事。於是花了一點點時間寫了一個Canal膠水層,讓接收到的FlatMessage根據表名稱直接轉換為對應的DTO實例,這樣能在一定程度上提升開發效率並且減少模板化代碼,這個膠水層的數據流示意圖如下:

簡化ETL工作,編寫一個Canal膠水層

要編寫這樣的膠水層主要用到:

  • 反射。
  • 註解。
  • 策略模式。
  • IOC容器(可選)。

項目的模塊如下:

  • canal-glue-core:核心功能。
  • spring-boot-starter-canal-glue:適配Spring的IOC容器,添加自動配置。
  • canal-glue-example:使用例子和基準測試。

下文會詳細分析此膠水層如何實現。

引入依賴

為了不汙染引用此模塊的外部服務依賴,除了JSON轉換的依賴之外,其他依賴的scope定義為provide或者test類型,依賴版本和BOM如下:

<code>

<

properties

>

        

<

project.build.sourceEncoding

>

UTF-8

project.build.sourceEncoding

>

        

<

maven.compiler.source

>

1.8

maven.compiler.source

>

        

<

maven.compiler.target

>

1.8

maven.compiler.target

>

        

<

spring.boot.version

>

2.3.0.RELEASE

spring.boot.version

>

        

<

maven.compiler.plugin.version

>

3.8.1

maven.compiler.plugin.version

>

        

<

lombok.version

>

1.18.12

lombok.version

>

        

<

fastjson.version

>

1.2.73

fastjson.version

>

properties

>

<

dependencyManagement

>

    

<

dependencies

>

        

<

dependency

>

            

<

groupId

>

org.springframework.boot

groupId

>

            

<

artifactId

>

spring-boot-dependencies

artifactId

>

            

<

version

>

${spring.boot.version}

version

>

            

<

scope

>

import

scope

>

            

<

type

>

pom

type

>

        

dependency

>

    

dependencies

>

dependencyManagement

>

<

dependencies

>

    

<

dependency

>

        

<

groupId

>

org.projectlombok

groupId

>

        

<

artifactId

>

lombok

artifactId

>

        

<

version

>

${lombok.version}

version

>

        

<

scope

>

provided

scope

>

    

dependency

>

    

<

dependency

>

        

<

groupId

>

org.springframework.boot

groupId

>

        

<

artifactId

>

spring-boot-starter-test

artifactId

>

        

<

scope

>

test

scope

>

    

dependency

>

    

<

dependency

>

        

<

groupId

>

org.springframework.boot

groupId

>

        

<

artifactId

>

spring-boot-starter

artifactId

>

        

<

scope

>

provided

scope

>

    

dependency

>

    

<

dependency

>

        

<

groupId

>

com.alibaba

groupId

>

        

<

artifactId

>

fastjson

artifactId

>

        

<

version

>

${fastjson.version}

version

>

    

dependency

>

dependencies

>

/<code>

其中,canal-glue-core模塊本質上只依賴於fastjson,可以完全脫離spring體系使用。

基本架構

這裡提供一個"後知後覺"的架構圖,因為之前為了快速懟到線上,初版沒有考慮這麼多,甚至還耦合了業務代碼,組件是後來抽離出來的:

簡化ETL工作,編寫一個Canal膠水層

設計配置模塊(已經移除)

設計配置模塊在設計的時候考慮使用了外置配置文件和純註解兩種方式,前期使用了JSON外置配置文件的方式,純註解是後來增加的,二選一。這一節簡單介紹一下JSON外置配置文件的配置加載,純註解留到後面處理器模塊時候分析。

當初是想快速進行膠水層的開發,所以配置文件使用了可讀性比較高的JSON格式:

<code>{
  

"version"

: 1,   

"module"

"canal-glue"

,   

"databases"

: [     {       

"database"

"db_payment_service"

,       

"processors"

: [         {           

"table"

"payment_order"

,           

"processor"

"x.y.z.PaymentOrderProcessor"

,           

"exceptionHandler"

"x.y.z.PaymentOrderExceptionHandler"

        }       ]     },     {       ......     }   ] } /<code>

JSON配置在設計的時候儘可能不要使用JSON Array作為頂層配置,因為這樣做設計的對象會比較怪

因為使用該模塊的應用有可能需要處理Canal解析多個上游數據庫的binlog事件,所以配置模塊設計的時候需要以database為KEY,掛載多個table以及對應的表binlog事件處理器以及異常處理器。然後對著JSON文件的格式擼一遍對應的實體類出來:

<code> 

public

 

class

 CanalGlueProcessorConf {     

private

 

String

 table;     

private

 

String

 processor;     

private

 

String

 exceptionHandler; }

public

 

class

 CanalGlueDatabaseConf {     

private

 

String

 database;     

private

 List processors; }

public

 

class

 CanalGlueConf {     

private

 Long version;     

private

 

String

 

module

;     private List database; } /<code>

實體編寫完,接著可以編寫一個配置加載器,簡單起見,配置文件直接放ClassPath之下,加載器如下:

<code>

public

 

interface

 

CanalGlueConfLoader

 

{     

CanalGlueConf 

load

(String location)

; }

public

 

class

 

ClassPathCanalGlueConfLoader

 

implements

 

CanalGlueConfLoader

 

{          

public

 CanalGlueConf 

load

(String location)

 

{         ClassPathResource resource = 

new

 ClassPathResource(location);         Assert.isTrue(resource.exists(), String.format(

"類路徑下不存在文件%s"

, location));         

try

 {             String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);             

return

 JSON.parseObject(content, CanalGlueConf

.

class

)

;         } 

catch

 (IOException e) {                          

throw

 

new

 IllegalStateException(e);         }     } } /<code>

讀取ClassPath下的某個location為絕對路徑的文件內容字符串,然後使用Fasfjson轉成CanalGlueConf對象。這個是默認的實現,使用canal-glue模塊可以覆蓋此實現,通過自定義的實現加載配置。

JSON配置模塊在後來從業務系統抽離此膠水層的時候已經完全廢棄,使用純註解驅動和核心抽象組件繼承的方式實現。

核心模塊開發

主要包括幾個模塊:

  • 基本模型定義。
  • 適配器層開發。
  • 轉換器和解析器層開發。
  • 處理器層開發。
  • 全局組件自動配置模塊開發(僅限於Spring體系,已經抽取到spring-boot-starter-canal-glue模塊)。
  • CanalGlue開發。

基本模型定義

定義頂層的KEY,也就是對於某個數據庫的某一個確定的表,需要一個唯一標識:

<code> 

public

 

interface

 

ModelTable

 

{     

String 

database

()

;     

String 

table

()

;     

static

 ModelTable 

of

(String database, String table)

 

{         

return

 DefaultModelTable.of(database, table);     } } (access = AccessLevel.PACKAGE, staticName = 

"of"

)

public

 

class

 

DefaultModelTable

 

implements

 

ModelTable

 

{     

private

 

final

 String database;     

private

 

final

 String table;          

public

 String 

database

()

 

{         

return

 database;     }          

public

 String 

table

()

 

{         

return

 table;     }          

public

 

boolean

 

equals

(Object o)

 

{         

if

 (

this

 == o) {             

return

 

true

;         }         

if

 (o == 

null

 || getClass() != o.getClass()) {             

return

 

false

;         }         DefaultModelTable that = (DefaultModelTable) o;         

return

 Objects.equals(database, that.database) &&                 Objects.equals(table, that.table);     }          

public

 

int

 

hashCode

()

 

{         

return

 Objects.hash(database, table);     } } /<code>

這裡實現類DefaultModelTable重寫了equals()和hashCode()方法便於把ModelTable實例應用為HashMap容器的KEY,這樣後面就可以設計ModelTable -> Processor的緩存結構。

由於Canal投放到Kafka的事件內容是一個原始字符串,所以要定義一個和前文提到的FlatMessage基本一致的事件類CanalBinLogEvent:

<code> 

public

 

class

 CanalBinLogEvent {          

private

 Long id;          

private

 ListString, 

String

>> data;          

private

 List<

String

> pkNames;          

private

 ListString, 

String

>> old;          

private

 

String

 

type

;          

private

 Long es;          

private

 Long ts;          

private

 

String

 sql;          

private

 

String

 database;          

private

 

String

 table;          

private

 Map
<

String

, Integer> sqlType;          

private

 Map<

String

String

> mysqlType;          

private

 

Boolean

 isDdl; } /<code>

根據此事件對象,再定義解析完畢後的結果對象CanalBinLogResult:

<code> 
 
 

public

 

enum

 BinLogEventType {          QUERY(

"QUERY"

"查詢"

),     INSERT(

"INSERT"

"新增"

),     UPDATE(

"UPDATE"

"更新"

),     DELETE(

"DELETE"

"刪除"

),     ALTER(

"ALTER"

"列修改操作"

),     UNKNOWN(

"UNKNOWN"

"未知"

),     ;     

private

 final 

String

 

type

;     

private

 final 

String

 description;     

public

 

static

 BinLogEventType fromType(

String

 

type

) {         

for

 (BinLogEventType binLogType : BinLogEventType.values()) {             

if

 (binLogType.getType().equals(

type

)) {                 

return

 binLogType;             }         }         

return

 BinLogEventType.UNKNOWN;     } }

public

 

enum

 OperationType {          DML(

"dml"

"DML語句"

),          DDL(

"ddl"

"DDL語句"

),     ;     

private

 final 

String

 

type

;     

private

 final 

String

 description; }

public

 

class

 CanalBinLogResult {          

private

 Long primaryKey;          

private

 BinLogEventType binLogEventType;          

private

 T beforeData;          

private

 T afterData;          

private

 

String

 databaseName;          

private

 

String

 tableName;          

private

 

String

 sql;          

private

 OperationType operationType; } /<code>

開發適配器層

定義頂層的適配器SPI接口:

<code>

public

 

interface

 

SourceAdapter

<

SOURCE

SINK

{     

SINK 

adapt

(SOURCE source)

; } /<code>

接著開發適配器實現類:

<code> 
 (access = AccessLevel.PACKAGE, staticName = 

"of"

)

class

 RawStringSourceAdapter 

implements

 SourceAdapter<

String

String

> {          

public

 

String

 adapt(

String

 source) {         

return

 source;     } } (access = AccessLevel.PACKAGE, staticName = 

"of"

)

class

 FastJsonSourceAdapter 

implements

 SourceAdapter<

String

, T> {     

private

 final Class klass;          

public

 T adapt(

String

 source) {         

if

 (StringUtils.isEmpty(source)) {             

return

 

null

;         }         

return

 

JSON

.parseObject(source, klass);     } }

public

 

enum

 SourceAdapterFacade {          X;     

private

 

static

 final SourceAdapter<

String

String

> I_S_A = RawStringSourceAdapter.of();      (

"unchecked"

)     

public

  T adapt(Class klass, 

String

 source) {         

if

 (klass.isAssignableFrom(

String

.class)) {             

return

 (T) I_S_A.adapt(source);         }         

return

 FastJsonSourceAdapter.of(klass).adapt(source);     } } /<code>

最終直接使用SourceAdapterFacade#adapt()方法即可,因為實際上絕大多數情況下只會使用原始字符串和String -> Class實例,適配器層設計可以簡單點。

開發轉換器和解析器層

對於Canal解析完成的binlog事件,data和old屬性是K-V結構,並且KEY都是String類型,需要遍歷解析才能推導出完整的目標實例。

轉換後的實例的屬性類型目前只支持包裝類,int等原始類型不支持

為了更好地通過目標實體和實際的數據庫、表和列名稱、列類型進行映射,引入了兩個自定義註解CanalModel和@CanalField,它們的定義如下:

<code> 
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)

public

 @

interface

 

CanalModel

 {          

String 

database

(

)

;          

String 

table

(

)

;          

FieldNamingPolicy 

fieldNamingPolicy

(

default

 FieldNamingPolicy.DEFAULT

; } @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD)

public

 @

interface

 

CanalField

 {          

String 

columnName

(

default

 ""

;          

JDBCType 

sqlType

(

default

 JDBCType.NULL

;          Class extends BaseCanalFieldConverter>> converterKlass() 

default

 NullCanalFieldConverter.

class

; } /<code>

定義頂層轉換器接口BinLogFieldConverter:

<code>

public

 

interface

 

BinLogFieldConverter

<

SOURCE

TARGET

{     

TARGET 

convert

(SOURCE source)

; } /<code>

目前暫定可以通過目標屬性的Class和通過註解指定的SQLType類型進行匹配,所以再定義一個抽象轉換器BaseCanalFieldConverter:

<code>

public

 

abstract

 

class

 

BaseCanalFieldConverter

<

T

implements

 

BinLogFieldConverter

<

String

T

{     

private

 

final

 SQLType sqlType;     

private

 

final

 Class> klass;     

protected

 

BaseCanalFieldConverter

(SQLType sqlType, Class> klass)

 

{         

this

.sqlType = sqlType;         

this

.klass = klass;     }          

public

 T 

convert

(String source)

 

{         

if

 (StringUtils.isEmpty(source)) {             

return

 

null

;         }         

return

 convertInternal(source);     }          

protected

 

abstract

 T 

convertInternal

(String source)

;          

public

 SQLType 

sqlType

()

 

{         

return

 sqlType;     }          

public

 Class> typeKlass() {         

return

 klass;     } } /<code>

BaseCanalFieldConverter是面向目標實例中的單個屬性的,例如對於實例中的Long類型的屬性,可以實現一個BigIntCanalFieldConverter:

<code>

public

 

class

 

BigIntCanalFieldConverter

 

extends

 

BaseCanalFieldConverter

<

Long

{          

public

 

static

 

final

 BaseCanalFieldConverter X = 

new

 BigIntCanalFieldConverter();     

private

 

BigIntCanalFieldConverter

()

 

{         

super

(JDBCType.BIGINT, Long

.

class

)

;     }          

protected

 Long 

convertInternal

(String source)

 

{         

if

 (

null

 == source) {             

return

 

null

;         }         

return

 Long.valueOf(source);     } } /<code>

其他類型以此類推,目前已經開發好的最常用的內建轉換器如下:


簡化ETL工作,編寫一個Canal膠水層

所有轉換器實現都設計為無狀態的單例,方便做動態註冊和覆蓋。接著定義一個轉換器工廠
CanalFieldConverterFactory,提供API通過指定參數加載目標轉換器實例:

<code> 
 (

"rawtypes"

)

public

 

class

 

CanalFieldConvertInput

 

{     

private

 Class> fieldKlass;     

private

 Class extends BaseCanalFieldConverter> converterKlass;     

private

 SQLType sqlType;          

public

 

CanalFieldConvertInput

()

 

{     } }

public

 

class

 

CanalFieldConvertResult

 

{     

private

 

final

 BaseCanalFieldConverter> converter; }

public

 

interface

 

CanalFieldConverterFactory

 

{     

default

 

void

 

registerConverter

(BaseCanalFieldConverter> converter)

 

{         registerConverter(converter, 

true

);     }     

void

 

registerConverter

(BaseCanalFieldConverter> converter, 

boolean

 replace)

;     

CanalFieldConvertResult 

load

(CanalFieldConvertInput input)

; } /<code>


CanalFieldConverterFactory提供了可以註冊自定義轉化器的registerConverter()方法,這樣就可以讓使用者註冊自定義的轉換器和覆蓋默認的轉換器。

至此,可以通過指定的參數,加載實例屬性的轉換器,拿到轉換器實例,就可以針對目標實例,從原始事件中解析對應的K-V結構。接著需要編寫最核心的解析器模塊,此模塊主要包含三個方面:

  • 唯一BIGINT類型主鍵的解析(這一點是公司技術規範的一條鐵規則,MySQL每個表只能定義唯一的BIGINT UNSIGNED自增趨勢主鍵)。
  • 更變前的數據,對應於原始事件中的old屬性節點(不一定存在,例如INSERT語句中不存在此屬性節點)。
  • 更變後的數據,對應於原始事件中的data屬性節點。

定義解析器接口CanalBinLogEventParser如下:

<code>

public

 

interface

 

CanalBinLogEventParser

 

{           

List

> parse(CanalBinLogEvent event,                                          

Class

<

T

klass

,                                          

BasePrimaryKeyTupleFunction

 

primaryKeyFunction

,                                          

BaseCommonEntryFunction

<

T

commonEntryFunction

); }

/<code>

解析器的解析方法依賴於:

  • binlog事件實例,這個是上游的適配器組件的結果。
  • 轉換的目標類型。
  • BasePrimaryKeyTupleFunction主鍵映射方法實例,默認使用內建的BigIntPrimaryKeyTupleFunction。
  • BaseCommonEntryFunction非主鍵通用列-屬性映射方法實例,默認使用內建的ReflectionBinLogEntryFunction(
    「這個是非主鍵列的轉換核心,裡面使用到了反射」)。

解析返回結果是一個List,原因是FlatMessage在批量寫入的時候的數據結構本來就是一個List>,這裡只是"順水推舟"。

開發處理器層

處理器是開發者處理最終解析出來的實體的入口,只需要面向不同類型的事件選擇對應的處理方法即可,看起來如下:

<code>

public

 

abstract

 

class

 

BaseCanalBinlogEventProcessor

<

T

extends

 

BaseParameterizedTypeReferenceSupport

<

T

{     

protected

 

void

 

processInsertInternal

(CanalBinLogResult result)

 

{     }     

protected

 

void

 

processUpdateInternal

(CanalBinLogResult result)

 

{     }     

protected

 

void

 

processDeleteInternal

(CanalBinLogResult result)

 

{     }     

protected

 

void

 

processDDLInternal

(CanalBinLogResult result)

 

{     } } /<code>

例如需要處理Insert事件,則子類繼承
BaseCanalBinlogEventProcessor,對應的實體類(泛型的替換)使用@CanalModel註解聲明,然後覆蓋processInsertInternal()方法即可。期間子處理器可以覆蓋自定義異常處理器實例,如:

<code>@

Override

protected

 ExceptionHandler 

exceptionHandler

()

 

{     

return

 EXCEPTION_HANDLER; }

private

 

static

 

final

 ExceptionHandler EXCEPTION_HANDLER = (event, throwable)         -> 

log

.error(

"解析binlog事件出現異常,事件內容:{}"

, JSON.toJSONString(event), throwable); /<code>

另外,有些場景需要對回調前或者回調後的結果做特化處理,因此引入瞭解析結果攔截器(鏈)的實現,對應的類是
BaseParseResultInterceptor:

<code>

public

 

abstract

 

class

 

BaseParseResultInterceptor

<

T

extends

 

BaseParameterizedTypeReferenceSupport

<

T

{     

public

 

BaseParseResultInterceptor

()

 

{         

super

();     }     

public

 

void

 

onParse

(ModelTable modelTable)

 

{     }     

public

 

void

 

onBeforeInsertProcess

(ModelTable modelTable, T beforeData, T afterData)

 

{     }     

public

 

void

 

onAfterInsertProcess

(ModelTable modelTable, T beforeData, T afterData)

 

{     }     

public

 

void

 

onBeforeUpdateProcess

(ModelTable modelTable, T beforeData, T afterData)

 

{     }     

public

 

void

 

onAfterUpdateProcess

(ModelTable modelTable, T beforeData, T afterData)

 

{     }     

public

 

void

 

onBeforeDeleteProcess

(ModelTable modelTable, T beforeData, T afterData)

 

{     }     

public

 

void

 

onAfterDeleteProcess

(ModelTable modelTable, T beforeData, T afterData)

 

{     }     

public

 

void

 

onBeforeDDLProcess

(ModelTable modelTable, T beforeData, T afterData, String sql)

 

{     }     

public

 

void

 

onAfterDDLProcess

(ModelTable modelTable, T beforeData, T afterData, String sql)

 

{     }     

public

 

void

 

onParseFinish

(ModelTable modelTable)

 

{     }     

public

 

void

 

onParseCompletion

(ModelTable modelTable)

 

{     } } /<code>

解析結果攔截器的回調時機可以參看上面的架構圖或者
BaseCanalBinlogEventProcessor的源代碼。

開發全局組件自動配置模塊

如果使用了Spring容器,需要添加一個配置類來加載所有既有的組件,添加一個全局配置類
CanalGlueAutoConfiguration(這個類可以在項目的
spring-boot-starter-canal-glue模塊中看到,這個模塊就只有一個類):

<code> 

public

 

class

 

CanalGlueAutoConfiguration

 

implements

 

SmartInitializingSingleton

BeanFactoryAware

 

{     

private

 ConfigurableListableBeanFactory configurableListableBeanFactory;               

public

 CanalBinlogEventProcessorFactory 

canalBinlogEventProcessorFactory

()

 

{         

return

 InMemoryCanalBinlogEventProcessorFactory.of();     }               

public

 ModelTableMetadataManager 

modelTableMetadataManager

(CanalFieldConverterFactory canalFieldConverterFactory)

 

{         

return

 InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);     }               

public

 CanalFieldConverterFactory 

canalFieldConverterFactory

()

 

{         

return

 InMemoryCanalFieldConverterFactory.of();     }               

public

 CanalBinLogEventParser 

canalBinLogEventParser

()

 

{         

return

 DefaultCanalBinLogEventParser.of();     }               

public

 ParseResultInterceptorManager 

parseResultInterceptorManager

(ModelTableMetadataManager modelTableMetadataManager)

 

{         

return

 InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);     }               

public

 CanalGlue 

canalGlue

(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory)

 

{         

return

 DefaultCanalGlue.of(canalBinlogEventProcessorFactory);     }          

public

 

void

 

setBeanFactory

(BeanFactory beanFactory)

 

throws

 BeansException 

{         

this

.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;     }      ({

"rawtypes"

"unchecked"

})          

public

 

void

 

afterSingletonsInstantiated

()

 

{         ParseResultInterceptorManager parseResultInterceptorManager                 = configurableListableBeanFactory.getBean(ParseResultInterceptorManager

.

class

)

;         ModelTableMetadataManager modelTableMetadataManager                 = configurableListableBeanFactory.getBean(ModelTableMetadataManager

.

class

)

;         CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory                 = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory

.

class

)

;         CanalBinLogEventParser canalBinLogEventParser                 = configurableListableBeanFactory.getBean(CanalBinLogEventParser

.

class

)

;         Map interceptors                 = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor

.

class

)

;         interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));         Map processors                 = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor

.

class

)

;         processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,                 canalBinlogEventProcessorFactory, parseResultInterceptorManager));     } } /<code>

為了更好地讓其他服務引入此配置類,可以使用spring.factories的特性。新建
resources/META-INF/spring.factories文件,內容如下:

<code>

org.springframework.boot.autoconfigure.EnableAutoConfiguration

=cn.throwx.canal.gule.config.CanalGlueAutoConfiguration /<code>

這樣子通過引入
spring-boot-starter-canal-glue就可以激活所有用到的組件並且初始化所有已經添加到Spring容器中的處理器。

CanalGlue開發

CanalGlue其實就是提供binlog事件字符串的處理入口,目前定義為一個接口:

<code>

public

 

interface

 

CanalGlue

 {     

void

 

process

(

String content

)

; } /<code>

此接口的實現DefaultCanalGlue也十分簡單:

<code> (access = AccessLevel.PUBLIC, staticName = 

"of"

)

public

 

class

 

DefaultCanalGlue

 

implements

 

CanalGlue

 

{     

private

 

final

 CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;          

public

 

void

 

process

(String content)

 

{         CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent

.

class

content

)

;         ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());         canalBinlogEventProcessorFactory.get(modelTable).forEach(processor -> processor.process(event));     } } /<code>

使用源適配器把字符串轉換為CanalBinLogEvent實例,再委託處理器工廠尋找對應的
BaseCanalBinlogEventProcessor列表去處理輸入的事件實例。

使用canal-glue

主要包括下面幾個維度,都在canal-glue-example的test包下:

  • [x] 一般情況下使用處理器處理INSERT事件。
  • [x] 自定義針對DDL變更的預警父處理器,實現DDL變更預警。
  • [x] 單表對應多個處理器。
  • [x] 使用解析結果處理器針對特定字段進行AES加解密處理。
  • [x] 非Spring容器下,一般編程式使用。
  • [ ] 使用openjdk-jmh進行Benchmark基準性能測試。

這裡簡單提一下在Spring體系下的使用方式,引入依賴
spring-boot-starter-canal-glue:

<code>

<

dependency

>

    

<

groupId

>

cn.throwx

groupId

>

    

<

artifactId

>

spring-boot-starter-canal-glue

artifactId

>

    

<

version

>

版本號

version

>

dependency

>

/<code>

編寫一個實體或者DTO類OrderModel:

<code> 
 

public

 static 

class

 

OrderModel

 

{     

private

 

Long

 id;     

private

 String orderId;     

private

 OffsetDateTime createTime;     

private

 BigDecimal amount; } /<code>

這裡使用了@CanalModel註解綁定了數據庫db_order_service和表t_order,屬性名-列名映射策略為「駝峰轉小寫下劃線」。接著定義一個處理器OrderProcessor和自定義異常處理器(可選,這裡是為了模擬在處理事件的時候拋出自定義異常):

<code> 

public

 

class

 

OrderProcessor

 

extends

 

BaseCanalBinlogEventProcessor

<

OrderModel

{          

protected

 

void

 

processInsertInternal

(CanalBinLogResult result)

 

{         OrderModel orderModel = result.getAfterData();         logger.info(

"接收到訂單保存binlog,主鍵:{},模擬拋出異常..."

, orderModel.getId());         

throw

 

new

 RuntimeException(String.format(

"[id:%d]"

, orderModel.getId()));     }          

protected

 ExceptionHandler 

exceptionHandler

()

 

{         

return

 EXCEPTION_HANDLER;     }          

private

 

static

 

final

 ExceptionHandler EXCEPTION_HANDLER = (event, throwable)             -> log.error(

"解析binlog事件出現異常,事件內容:{}"

, JSON.toJSONString(event), throwable); } /<code>

假設一個寫入訂單數據的binlog事件如下:

<code>{
  

"data"

: [     {       

"id"

"1"

,       

"order_id"

"10086"

,       

"amount"

"999.0"

,       

"create_time"

"2020-03-02 05:12:49"

    }   ],   

"database"

"db_order_service"

,   

"es"

1583143969000

,   

"id"

3

,   

"isDdl"

false

,   

"mysqlType"

: {     

"id"

"BIGINT"

,     

"order_id"

"VARCHAR(64)"

,     

"amount"

"DECIMAL(10,2)"

,     

"create_time"

"DATETIME"

  },   

"old"

null

,   

"pkNames"

: [     

"id"

  ],   

"sql"

""

,   

"sqlType"

: {     

"id"

-5

,     

"order_id"

12

,     

"amount"

3

,     

"create_time"

93

  },   

"table"

"t_order"

,   

"ts"

1583143969460

,   

"type"

"INSERT"

} /<code>

執行結果如下:

簡化ETL工作,編寫一個Canal膠水層

如果直接對接Canal投放到Kafka的Topic也很簡單,配合Kafka的消費者使用的示例如下:

<code>

@Slf4j

@Component

@RequiredArgsConstructor

public class CanalEventListeners {     

private

 

final

 

CanalGlue

 

canalGlue

;     @

KafkaListener

(             id = 

"${canal.event.order.listener.id:db-order-service-listener}"

,             topics = 

"db_order_service"

,              containerFactory = 

"kafkaListenerContainerFactory"

    )     

public

 

void

 

onCrmMessage

(String content) {         

canalGlue

.process

(content);     }     } /<code>

小結

筆者開發這個canal-glue的初衷是需要做一個極大提升效率的大型字符串轉換器,因為剛剛接觸到"小數據"領域,而且人手不足,而且需要處理下游大量的報表,因為不可能花大量人力在處理這些不停重複的模板化代碼上。雖然整體設計還不是十分優雅,

「至少在提升開發效率這個點上」,canal-glue做到了。


分享到:


相關文章: