整篇文章可能會有點流水賬,相對詳細地介紹怎麼寫一個小型的"框架"。這個精悍的膠水層已經在生產環境服役超過半年,這裡嘗試把耦合業務的代碼去掉,提煉出一個相對簡潔的版本。
之前寫的幾篇文章裡面其中一篇曾經提到過Canal解析MySQL的binlog事件後的對象如下(來源於Canal源碼
com.alibaba.otter.canal.protocol.FlatMessage):
如果直接對此原始對象進行解析,那麼會出現很多解析模板代碼,一旦有改動就會牽一髮動全身,這是我們不希望發生的一件事。於是花了一點點時間寫了一個Canal膠水層,讓接收到的FlatMessage根據表名稱直接轉換為對應的DTO實例,這樣能在一定程度上提升開發效率並且減少模板化代碼,這個膠水層的數據流示意圖如下:
要編寫這樣的膠水層主要用到:
- 反射。
- 註解。
- 策略模式。
- IOC容器(可選)。
項目的模塊如下:
- canal-glue-core:核心功能。
- spring-boot-starter-canal-glue:適配Spring的IOC容器,添加自動配置。
- canal-glue-example:使用例子和基準測試。
下文會詳細分析此膠水層如何實現。
引入依賴
為了不汙染引用此模塊的外部服務依賴,除了JSON轉換的依賴之外,其他依賴的scope定義為provide或者test類型,依賴版本和BOM如下:
<code><
properties
><
project.build.sourceEncoding
>UTF-8project.build.sourceEncoding
><
maven.compiler.source
>1.8maven.compiler.source
><
maven.compiler.target
>1.8maven.compiler.target
><
spring.boot.version
>2.3.0.RELEASEspring.boot.version
><
maven.compiler.plugin.version
>3.8.1maven.compiler.plugin.version
><
lombok.version
>1.18.12lombok.version
><
fastjson.version
>1.2.73fastjson.version
>properties
><
dependencyManagement
><
dependencies
><
dependency
><
groupId
>org.springframework.bootgroupId
><
artifactId
>spring-boot-dependenciesartifactId
><
version
>${spring.boot.version}version
><
scope
>importscope
><
type
>pomtype
>dependency
>dependencies
>dependencyManagement
><
dependencies
><
dependency
><
groupId
>org.projectlombokgroupId
><
artifactId
>lombokartifactId
><
version
>${lombok.version}version
><
scope
>providedscope
>dependency
><
dependency
><
groupId
> org.springframework.bootgroupId
><
artifactId
>spring-boot-starter-testartifactId
><
scope
>testscope
>dependency
><
dependency
><
groupId
>org.springframework.bootgroupId
><
artifactId
>spring-boot-starterartifactId
><
scope
>providedscope
>dependency
><
dependency
><
groupId
>com.alibabagroupId
><
artifactId
>fastjsonartifactId
><
version
>${fastjson.version}version
>dependency
>
dependencies
> /<code>
其中,canal-glue-core模塊本質上只依賴於fastjson,可以完全脫離spring體系使用。
基本架構
這裡提供一個"後知後覺"的架構圖,因為之前為了快速懟到線上,初版沒有考慮這麼多,甚至還耦合了業務代碼,組件是後來抽離出來的:
設計配置模塊(已經移除)
❝
設計配置模塊在設計的時候考慮使用了外置配置文件和純註解兩種方式,前期使用了JSON外置配置文件的方式,純註解是後來增加的,二選一。這一節簡單介紹一下JSON外置配置文件的配置加載,純註解留到後面處理器模塊時候分析。
❞
當初是想快速進行膠水層的開發,所以配置文件使用了可讀性比較高的JSON格式:
<code>{"version"
: 1,"module"
:"canal-glue"
,"databases"
: [ {"database"
:"db_payment_service"
,"processors"
: [ {"table"
:"payment_order"
,"processor"
:"x.y.z.PaymentOrderProcessor"
,"exceptionHandler"
:"x.y.z.PaymentOrderExceptionHandler"
} ] }, { ...... } ] } /<code>
❝
JSON配置在設計的時候儘可能不要使用JSON Array作為頂層配置,因為這樣做設計的對象會比較怪
❞
因為使用該模塊的應用有可能需要處理Canal解析多個上游數據庫的binlog事件,所以配置模塊設計的時候需要以database為KEY,掛載多個table以及對應的表binlog事件處理器以及異常處理器。然後對著JSON文件的格式擼一遍對應的實體類出來:
<code>public
class
CanalGlueProcessorConf {private
String
table;private
String
processor;private
String
exceptionHandler; }public
class
CanalGlueDatabaseConf {private
String
database;private
List processors; }public
class
CanalGlueConf {private
Long version;private
String
module
; private List database; } /<code>
實體編寫完,接著可以編寫一個配置加載器,簡單起見,配置文件直接放ClassPath之下,加載器如下:
<code>public
interface
CanalGlueConfLoader
{CanalGlueConf
load
(String location)
; }public
class
ClassPathCanalGlueConfLoader
implements
CanalGlueConfLoader
{public
CanalGlueConfload
(String location)
{ ClassPathResource resource =new
ClassPathResource(location); Assert.isTrue(resource.exists(), String.format("類路徑下不存在文件%s"
, location));try
{ String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);return
JSON.parseObject(content, CanalGlueConf.
class
); }catch
(IOException e) {throw
new
IllegalStateException(e); } } } /<code>
讀取ClassPath下的某個location為絕對路徑的文件內容字符串,然後使用Fasfjson轉成CanalGlueConf對象。這個是默認的實現,使用canal-glue模塊可以覆蓋此實現,通過自定義的實現加載配置。
❝
JSON配置模塊在後來從業務系統抽離此膠水層的時候已經完全廢棄,使用純註解驅動和核心抽象組件繼承的方式實現。
❞
核心模塊開發
主要包括幾個模塊:
- 基本模型定義。
- 適配器層開發。
- 轉換器和解析器層開發。
- 處理器層開發。
- 全局組件自動配置模塊開發(僅限於Spring體系,已經抽取到spring-boot-starter-canal-glue模塊)。
- CanalGlue開發。
基本模型定義
定義頂層的KEY,也就是對於某個數據庫的某一個確定的表,需要一個唯一標識:
<code>public
interface
ModelTable
{String
database
()
;String
table
()
;static
ModelTableof
(String database, String table)
{return
DefaultModelTable.of(database, table); } } (access = AccessLevel.PACKAGE, staticName ="of"
)public
class
DefaultModelTable
implements
ModelTable
{private
final
String database;private
final
String table;public
Stringdatabase
()
{return
database; }public
Stringtable
()
{return
table; }public
boolean
equals
(Object o)
{if
(this
== o) {return
true
; }if
(o ==null
|| getClass() != o.getClass()) {return
false
; } DefaultModelTable that = (DefaultModelTable) o;return
Objects.equals(database, that.database) && Objects.equals(table, that.table); }public
int
hashCode
()
{return
Objects.hash(database, table); } } /<code>
這裡實現類DefaultModelTable重寫了equals()和hashCode()方法便於把ModelTable實例應用為HashMap容器的KEY,這樣後面就可以設計ModelTable -> Processor的緩存結構。
由於Canal投放到Kafka的事件內容是一個原始字符串,所以要定義一個和前文提到的FlatMessage基本一致的事件類CanalBinLogEvent:
<code>public
class
CanalBinLogEvent {private
Long id;private
ListString,String
>> data;private
List<String
> pkNames;private
ListString,String
>> old;private
String
type
;private
Long es;private
Long ts;private
String
sql;private
String
database;private
String
table;private
Map <String
, Integer> sqlType;private
Map<String
,String
> mysqlType;private
Boolean
isDdl; } /<code>
根據此事件對象,再定義解析完畢後的結果對象CanalBinLogResult:
<code>public
enum
BinLogEventType { QUERY("QUERY"
,"查詢"
), INSERT("INSERT"
,"新增"
), UPDATE("UPDATE"
,"更新"
), DELETE("DELETE"
,"刪除"
), ALTER("ALTER"
,"列修改操作"
), UNKNOWN("UNKNOWN"
,"未知"
), ;private
finalString
type
;private
finalString
description;public
static
BinLogEventType fromType(String
type
) {for
(BinLogEventType binLogType : BinLogEventType.values()) {if
(binLogType.getType().equals(type
)) {return
binLogType; } }return
BinLogEventType.UNKNOWN; } }public
enum
OperationType { DML("dml"
,"DML語句"
), DDL("ddl"
,"DDL語句"
), ;private
finalString
type
;private
finalString
description; }public
class
CanalBinLogResult {private
Long primaryKey;private
BinLogEventType binLogEventType;private
T beforeData;private
T afterData;private
String
databaseName;private
String
tableName;private
String
sql;private
OperationType operationType; } /<code>
開發適配器層
定義頂層的適配器SPI接口:
<code>public
interface
SourceAdapter
<SOURCE
,SINK
> {SINK
adapt
(SOURCE source)
; } /<code>
接著開發適配器實現類:
<code> (access = AccessLevel.PACKAGE, staticName ="of"
)class
RawStringSourceAdapterimplements
SourceAdapter<String
,String
> {public
String
adapt(String
source) {return
source; } } (access = AccessLevel.PACKAGE, staticName ="of"
)class
FastJsonSourceAdapterimplements
SourceAdapter<String
, T> {private
final Class klass;public
T adapt(String
source) {if
(StringUtils.isEmpty(source)) {return
null
; }return
JSON
.parseObject(source, klass); } }public
enum
SourceAdapterFacade { X;private
static
final SourceAdapter<String
,String
> I_S_A = RawStringSourceAdapter.of(); ("unchecked"
)public
T adapt(Class klass,String
source) {if
(klass.isAssignableFrom(String
.class)) {return
(T) I_S_A.adapt(source); }return
FastJsonSourceAdapter.of(klass).adapt(source); } } /<code>
最終直接使用SourceAdapterFacade#adapt()方法即可,因為實際上絕大多數情況下只會使用原始字符串和String -> Class實例,適配器層設計可以簡單點。
開發轉換器和解析器層
對於Canal解析完成的binlog事件,data和old屬性是K-V結構,並且KEY都是String類型,需要遍歷解析才能推導出完整的目標實例。
❝
轉換後的實例的屬性類型目前只支持包裝類,int等原始類型不支持
❞
為了更好地通過目標實體和實際的數據庫、表和列名稱、列類型進行映射,引入了兩個自定義註解CanalModel和@CanalField,它們的定義如下:
<code> @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE)public
@interface
CanalModel
{String
database
();String
table
();FieldNamingPolicy
fieldNamingPolicy
()default
FieldNamingPolicy.DEFAULT; } @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD)public
@interface
CanalField
{String
columnName
( )default
"";JDBCType
sqlType
()default
JDBCType.NULL; Class extends BaseCanalFieldConverter>> converterKlass()default
NullCanalFieldConverter.class
; } /<code>
定義頂層轉換器接口BinLogFieldConverter:
<code>public
interface
BinLogFieldConverter
<SOURCE
,TARGET
> {TARGET
convert
(SOURCE source)
; } /<code>
目前暫定可以通過目標屬性的Class和通過註解指定的SQLType類型進行匹配,所以再定義一個抽象轉換器BaseCanalFieldConverter:
<code>public
abstract
class
BaseCanalFieldConverter
<T
>implements
BinLogFieldConverter
<String
,T
> {private
final
SQLType sqlType;private
final
Class> klass;protected
BaseCanalFieldConverter
(SQLType sqlType, Class> klass)
{this
.sqlType = sqlType;this
.klass = klass; }public
Tconvert
(String source)
{if
(StringUtils.isEmpty(source)) {return
null
; }return
convertInternal(source); }protected
abstract
TconvertInternal
(String source)
;public
SQLTypesqlType
()
{return
sqlType; }public
Class> typeKlass() {return
klass; } } /<code>
BaseCanalFieldConverter是面向目標實例中的單個屬性的,例如對於實例中的Long類型的屬性,可以實現一個BigIntCanalFieldConverter:
<code>public
class
BigIntCanalFieldConverter
extends
BaseCanalFieldConverter
<Long
> {public
static
final
BaseCanalFieldConverter X =new
BigIntCanalFieldConverter();private
BigIntCanalFieldConverter
()
{super
(JDBCType.BIGINT, Long.
class
); }protected
LongconvertInternal
(String source)
{if
(null
== source) {return
null
; }return
Long.valueOf(source); } } /<code>
其他類型以此類推,目前已經開發好的最常用的內建轉換器如下:
所有轉換器實現都設計為無狀態的單例,方便做動態註冊和覆蓋。接著定義一個轉換器工廠
CanalFieldConverterFactory,提供API通過指定參數加載目標轉換器實例:
<code> ("rawtypes"
)public
class
CanalFieldConvertInput
{private
Class> fieldKlass;private
Class extends BaseCanalFieldConverter> converterKlass;private
SQLType sqlType;public
CanalFieldConvertInput
()
{ } }public
class
CanalFieldConvertResult
{private
final
BaseCanalFieldConverter> converter; }public
interface
CanalFieldConverterFactory
{default
void
registerConverter
(BaseCanalFieldConverter> converter)
{ registerConverter(converter,true
); }void
registerConverter
(BaseCanalFieldConverter> converter,
boolean
replace);CanalFieldConvertResult
load
(CanalFieldConvertInput input)
; } /<code>
CanalFieldConverterFactory提供了可以註冊自定義轉化器的registerConverter()方法,這樣就可以讓使用者註冊自定義的轉換器和覆蓋默認的轉換器。
至此,可以通過指定的參數,加載實例屬性的轉換器,拿到轉換器實例,就可以針對目標實例,從原始事件中解析對應的K-V結構。接著需要編寫最核心的解析器模塊,此模塊主要包含三個方面:
- 唯一BIGINT類型主鍵的解析(這一點是公司技術規範的一條鐵規則,MySQL每個表只能定義唯一的BIGINT UNSIGNED自增趨勢主鍵)。
- 更變前的數據,對應於原始事件中的old屬性節點(不一定存在,例如INSERT語句中不存在此屬性節點)。
- 更變後的數據,對應於原始事件中的data屬性節點。
定義解析器接口CanalBinLogEventParser如下:
<code>public
interface
CanalBinLogEventParser
{List
> parse(CanalBinLogEvent event,Class
<T
>klass
,BasePrimaryKeyTupleFunction
primaryKeyFunction
,BaseCommonEntryFunction
<T
>commonEntryFunction
); } /<code>
解析器的解析方法依賴於:
- binlog事件實例,這個是上游的適配器組件的結果。
- 轉換的目標類型。
- BasePrimaryKeyTupleFunction主鍵映射方法實例,默認使用內建的BigIntPrimaryKeyTupleFunction。
- BaseCommonEntryFunction非主鍵通用列-屬性映射方法實例,默認使用內建的ReflectionBinLogEntryFunction( 「這個是非主鍵列的轉換核心,裡面使用到了反射」)。
解析返回結果是一個List,原因是FlatMessage在批量寫入的時候的數據結構本來就是一個List>,這裡只是"順水推舟"。
開發處理器層
處理器是開發者處理最終解析出來的實體的入口,只需要面向不同類型的事件選擇對應的處理方法即可,看起來如下:
<code>public
abstract
class
BaseCanalBinlogEventProcessor
<T
>extends
BaseParameterizedTypeReferenceSupport
<T
> {protected
void
processInsertInternal
(CanalBinLogResult result)
{ }protected
void
processUpdateInternal
(CanalBinLogResult result)
{ }protected
void
processDeleteInternal
(CanalBinLogResult result)
{ }protected
void
processDDLInternal
(CanalBinLogResult result)
{ } } /<code>
例如需要處理Insert事件,則子類繼承
BaseCanalBinlogEventProcessor,對應的實體類(泛型的替換)使用@CanalModel註解聲明,然後覆蓋processInsertInternal()方法即可。期間子處理器可以覆蓋自定義異常處理器實例,如:
<code>@Override
protected
ExceptionHandlerexceptionHandler
()
{return
EXCEPTION_HANDLER; }private
static
final
ExceptionHandler EXCEPTION_HANDLER = (event, throwable) ->log
.error("解析binlog事件出現異常,事件內容:{}"
, JSON.toJSONString(event), throwable); /<code>
另外,有些場景需要對回調前或者回調後的結果做特化處理,因此引入瞭解析結果攔截器(鏈)的實現,對應的類是
BaseParseResultInterceptor:
<code>public
abstract
class
BaseParseResultInterceptor
<T
>extends
BaseParameterizedTypeReferenceSupport
<T
> {public
BaseParseResultInterceptor
()
{super
(); }public
void
onParse
(ModelTable modelTable)
{ }public
void
onBeforeInsertProcess
(ModelTable modelTable, T beforeData, T afterData)
{ }public
void
onAfterInsertProcess
(ModelTable modelTable, T beforeData, T afterData)
{ }public
void
onBeforeUpdateProcess
(ModelTable modelTable, T beforeData, T afterData)
{ }public
void
onAfterUpdateProcess
(ModelTable modelTable, T beforeData, T afterData)
{ }public
void
onBeforeDeleteProcess
(ModelTable modelTable, T beforeData, T afterData)
{ }public
void
onAfterDeleteProcess
(ModelTable modelTable, T beforeData, T afterData)
{ }public
void
onBeforeDDLProcess
(ModelTable modelTable, T beforeData, T afterData, String sql)
{ }public
void
onAfterDDLProcess
(ModelTable modelTable, T beforeData, T afterData, String sql)
{ }public
void
onParseFinish
(ModelTable modelTable)
{ }public
void
onParseCompletion
(ModelTable modelTable)
{ } } /<code>
解析結果攔截器的回調時機可以參看上面的架構圖或者
BaseCanalBinlogEventProcessor的源代碼。
開發全局組件自動配置模塊
如果使用了Spring容器,需要添加一個配置類來加載所有既有的組件,添加一個全局配置類
CanalGlueAutoConfiguration(這個類可以在項目的
spring-boot-starter-canal-glue模塊中看到,這個模塊就只有一個類):
<code>public
class
CanalGlueAutoConfiguration
implements
SmartInitializingSingleton
,BeanFactoryAware
{private
ConfigurableListableBeanFactory configurableListableBeanFactory;public
CanalBinlogEventProcessorFactorycanalBinlogEventProcessorFactory
()
{return
InMemoryCanalBinlogEventProcessorFactory.of(); }public
ModelTableMetadataManagermodelTableMetadataManager
(CanalFieldConverterFactory canalFieldConverterFactory)
{return
InMemoryModelTableMetadataManager.of(canalFieldConverterFactory); }public
CanalFieldConverterFactorycanalFieldConverterFactory
()
{return
InMemoryCanalFieldConverterFactory.of(); }public
CanalBinLogEventParsercanalBinLogEventParser
()
{return
DefaultCanalBinLogEventParser.of(); }public
ParseResultInterceptorManagerparseResultInterceptorManager
(ModelTableMetadataManager modelTableMetadataManager)
{return
InMemoryParseResultInterceptorManager.of(modelTableMetadataManager); }public
CanalGluecanalGlue
(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory)
{return
DefaultCanalGlue.of(canalBinlogEventProcessorFactory); }public
void
setBeanFactory
(BeanFactory beanFactory)
throws
BeansException {this
.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory; } ({"rawtypes"
,"unchecked"
})public
void
afterSingletonsInstantiated
()
{ ParseResultInterceptorManager parseResultInterceptorManager = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.
class
); ModelTableMetadataManager modelTableMetadataManager = configurableListableBeanFactory.getBean(ModelTableMetadataManager.
class
); CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.
class
); CanalBinLogEventParser canalBinLogEventParser = configurableListableBeanFactory.getBean(CanalBinLogEventParser.
class
); Map interceptors = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.
class
); interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor)); Map processors = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.
class
); processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager, canalBinlogEventProcessorFactory, parseResultInterceptorManager)); } } /<code>
為了更好地讓其他服務引入此配置類,可以使用spring.factories的特性。新建
resources/META-INF/spring.factories文件,內容如下:
<code>org.springframework.boot.autoconfigure.EnableAutoConfiguration
=cn.throwx.canal.gule.config.CanalGlueAutoConfiguration /<code>
這樣子通過引入
spring-boot-starter-canal-glue就可以激活所有用到的組件並且初始化所有已經添加到Spring容器中的處理器。
CanalGlue開發
CanalGlue其實就是提供binlog事件字符串的處理入口,目前定義為一個接口:
<code>public
interface
CanalGlue
{void
process
(String content
); } /<code>
此接口的實現DefaultCanalGlue也十分簡單:
<code> (access = AccessLevel.PUBLIC, staticName ="of"
)public
class
DefaultCanalGlue
implements
CanalGlue
{private
final
CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;public
void
process
(String content)
{ CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.
class
,content
); ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable()); canalBinlogEventProcessorFactory.get(modelTable).forEach(processor -> processor.process(event)); } } /<code>
使用源適配器把字符串轉換為CanalBinLogEvent實例,再委託處理器工廠尋找對應的
BaseCanalBinlogEventProcessor列表去處理輸入的事件實例。
使用canal-glue
主要包括下面幾個維度,都在canal-glue-example的test包下:
- [x] 一般情況下使用處理器處理INSERT事件。
- [x] 自定義針對DDL變更的預警父處理器,實現DDL變更預警。
- [x] 單表對應多個處理器。
- [x] 使用解析結果處理器針對特定字段進行AES加解密處理。
- [x] 非Spring容器下,一般編程式使用。
- [ ] 使用openjdk-jmh進行Benchmark基準性能測試。
這裡簡單提一下在Spring體系下的使用方式,引入依賴
spring-boot-starter-canal-glue:
<code><
dependency
><
groupId
>cn.throwxgroupId
><
artifactId
>spring-boot-starter-canal-glueartifactId
><
version
>版本號version
>dependency
> /<code>
編寫一個實體或者DTO類OrderModel:
<code>public
staticclass
OrderModel
{private
Long
id;private
String orderId;private
OffsetDateTime createTime;private
BigDecimal amount; } /<code>
這裡使用了@CanalModel註解綁定了數據庫db_order_service和表t_order,屬性名-列名映射策略為「駝峰轉小寫下劃線」。接著定義一個處理器OrderProcessor和自定義異常處理器(可選,這裡是為了模擬在處理事件的時候拋出自定義異常):
<code>public
class
OrderProcessor
extends
BaseCanalBinlogEventProcessor
<OrderModel
> {protected
void
processInsertInternal
(CanalBinLogResult result)
{ OrderModel orderModel = result.getAfterData(); logger.info("接收到訂單保存binlog,主鍵:{},模擬拋出異常..."
, orderModel.getId());throw
new
RuntimeException(String.format("[id:%d]"
, orderModel.getId())); }protected
ExceptionHandlerexceptionHandler
()
{return
EXCEPTION_HANDLER; }private
static
final
ExceptionHandler EXCEPTION_HANDLER = (event, throwable) -> log.error("解析binlog事件出現異常,事件內容:{}"
, JSON.toJSONString(event), throwable); } /<code>
假設一個寫入訂單數據的binlog事件如下:
<code>{"data"
: [ {"id"
:"1"
,"order_id"
:"10086"
,"amount"
:"999.0"
,"create_time"
:"2020-03-02 05:12:49"
} ],"database"
:"db_order_service"
,"es"
:1583143969000
,"id"
:3
,"isDdl"
:false
,"mysqlType"
: {"id"
:"BIGINT"
,"order_id"
:"VARCHAR(64)"
,"amount"
:"DECIMAL(10,2)"
,"create_time"
:"DATETIME"
},"old"
:null
,"pkNames"
: ["id"
],"sql"
:""
,"sqlType"
: {"id"
:-5
,"order_id"
:12
,"amount"
:3
,"create_time"
:93
},"table"
:"t_order"
,"ts"
:1583143969460
,"type"
:"INSERT"
} /<code>
執行結果如下:
如果直接對接Canal投放到Kafka的Topic也很簡單,配合Kafka的消費者使用的示例如下:
<code>@Slf4j
@Component
@RequiredArgsConstructor
public class CanalEventListeners {private
final
CanalGlue
canalGlue
; @KafkaListener
( id ="${canal.event.order.listener.id:db-order-service-listener}"
, topics ="db_order_service"
, containerFactory ="kafkaListenerContainerFactory"
)public
void
onCrmMessage
(String content) {canalGlue
.process
(content); } } /<code>
小結
筆者開發這個canal-glue的初衷是需要做一個極大提升效率的大型字符串轉換器,因為剛剛接觸到"小數據"領域,而且人手不足,而且需要處理下游大量的報表,因為不可能花大量人力在處理這些不停重複的模板化代碼上。雖然整體設計還不是十分優雅,
「至少在提升開發效率這個點上」,canal-glue做到了。關鍵字: return source ModelTable