Java 服務調用全流程追蹤 簡易實現方案

前言

前段時間,本人一直協助項目組在做系統的重構,系統應用被拆分成了多個服務,部分服務做了集群部署。隨著上述架構的演進,自然而然的引進了ELK + Filebeat 做日誌收集。但是在使用Kibana查看日誌時,由於缺少TraceID,導致開發人員很難篩選出指定請求的相關日誌,也很難追蹤應用對下游服務的調用過程,耗費了很多時間。我自己查過幾次問題之後,實在受不了每次要花這麼久的時間,就趕緊向主管提了這一次的改造。

本篇文章主要是記錄本人對項目TraceID鏈路追蹤改造的解決方案的研究、遇到的問題和具體的實現,同時本次改造也加深了我自己對分佈式服務追蹤的一些理解,我也寫在了裡面。

本文主要內容:

  • 初步實現
  • 異步線程traceId丟失的問題
  • 面向 Dubbo RPC 鏈路追蹤
  • 面向 HTTP Service 鏈路追蹤
  • 思考 SpringCloud Sleuth 的實現
  • 小結

一、初步實現

大體的思路就是藉助slf4j的MDC功能 + Spring Interceptor,當外部請求進入時生成一個traceId放在MDC當中。

MDC

這裡簡單介紹一下MDC。

MDC(Mapped Diagnostic Context,映射調試上下文)是 log4j 和 logback 提供的一種方便在多線程條件下記錄日誌的功能。MDC 可以看成是一個與當前線程綁定的Map,可以往其中添加鍵值對。MDC 中包含的內容可以被同一線程中執行的代碼所訪問。當前線程的子線程會繼承其父線程中的 MDC 的內容。當需要記錄日誌時,只需要從 MDC 中獲取所需的信息即可。MDC 的內容則由程序在適當的時候保存進去。對於一個 Web 應用來說,通常是在請求被處理的最開始保存這些數據。

簡單來說,MDC就是日誌框架提供的一個InheritableThreadLocal,項目代碼中可以將鍵值對放入其中,在打印的時候從ThreadLocal中獲取到對應的值然後打印出來。詳細的原理本文就不贅述了。看下 log4j 和 logback 裡面的實現類就知道了。

實現

  1. 自定義Spring 攔截器 TraceInterceptor
<code>/**
* @author Richard_yyf
*/

public class TraceInterceptor extends HandlerInterceptorAdapter {

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 清空
MDC.clear();

ThreadMdcUtil.setTraceIdIfAbsent();

//後續邏輯... ...
return true;
}
}/<code>
  1. 註冊 攔截器
<code>/**
* @author Richard_yyf
*/
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(traceInterceptor())
.addPathPatterns("/**")
.order(0);
}

@Bean
public TraceInterceptor traceInterceptor() {
return new TraceInterceptor();
}

}/<code>

ThreadMdcUtil是我自己封裝的一個工具類,包裝了對 TraceId 的一些操作:

<code>public class ThreadMdcUtil {

public static String createTraceId() {
String uuid = UUID.randomUUID().toString();
return DigestUtils.md5Hex(uuid).substring(8, 24);
}

public static void setTraceIdIfAbsent() {
if (MDC.get(TRACE_ID) == null) {
MDC.put(TRACE_ID, createTraceId());
}
}
// 省略了一些方法在後面會展示出來
}/<code>

DigestUtils來自於第三方依賴:

<code><dependency>
\t<groupid>commons-codec/<groupid>
\t<artifactid>commons-codec/<artifactid>
<version>***/<version>
/<dependency>/<code>

TRACE_ID放在 Constant類中方便引用:

<code>public class Constant {
...
public static final String TRACE_ID = "traceId";
...
/<code>
  1. 在日誌配置文件中修改輸出格式,增加TraceID字段的打印 取值方式:%X{traceid}


Java 服務調用全流程追蹤 簡易實現方案


結果

通過上面的步驟之後,你的web應用接收到請求後打印的日誌就會帶上TraceId。


Java 服務調用全流程追蹤 簡易實現方案


二、遇上線程池 TraceID 丟失的問題

前面的方案只是簡單實現了我們的最基礎的需求。但是如果你真的使用起來,會發現異步的任務線程是沒有獲取到TraceID的。

一個成熟的應用肯定會用到很多的線程池。常見的有@Async異步調用的線程池,應用自身定義的一些線程池等等。

前面有稍微提到過,MDC是通過InheritableThreadLocal實現的,創建子線程時,會複製父線程的inheritableThreadLocals屬性。但是在線程池中,線程是複用的,而不是新創建的,所以MDC內容就無法傳遞進去。

所以我們就需要曲線救國,既然線程是複用的,那我們理所當然的就能想到在任務提交至線程池的時候做一些“騷”操作,來講MDC的內容傳遞下去。

改造

這裡就直接放上代碼:

<code>/**
* @author Richard_yyf
*/
public class ThreadMdcUtil {

public static String createTraceId() {

String uuid = UUID.randomUUID().toString();
return DigestUtils.md5Hex(uuid).substring(8, 24);
}

public static void setTraceIdIfAbsent() {
if (MDC.get(TRACE_ID) == null) {
MDC.put(TRACE_ID, createTraceId());
}
}

public static void setTraceId() {
MDC.put(TRACE_ID, createTraceId());
}

public static void setTraceId(String traceId) {
MDC.put(TRACE_ID, traceId);
}

public static Callable wrap(final Callable callable, final Map<string> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
return callable.call();
} finally {
MDC.clear();
}
};
}

public static Runnable wrap(final Runnable runnable, final Map<string> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}

};
}
}/<string>/<string>
/<code>

自己包裝擴展 ThreadPoolExecutor

<code>/**
* @author Richard_yyf
*/
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

@Override
public void execute(Runnable task) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}

@Override
public Future submit(Runnable task, T result) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
}

@Override
public Future
submit(Callable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}

@Override
public Future> submit(Runnable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
/<runnable>/<runnable>/<runnable>/<runnable>/<code>

使用

具體的使用就是把你原來executor = new ThreadPoolExecutor(...)改成executor = new ThreadPoolExecutorMdcWrapper(...)即可。

比如你是用Spring @Async異步方法的,在配置線程池的時候就這樣聲明:

<code>@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@EnableAsync
@Configuration
class TaskPoolConfig {

@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolExecutorMdcWrapper();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
return executor;
}
}

}/<code>

結果

按照上述步驟,你的異步任務在打印日誌的時候,就會帶上原本請求的TraceID了。


Java 服務調用全流程追蹤 簡易實現方案


三、面向 Dubbo RPC 鏈路追蹤

我們項目組主要使用Dubbo進行微服務框架的開發。我們想在服務調用之間,傳遞上游服務的TraceID,來達到鏈路追蹤的效果。

Dubbo 提供了這樣的機制,可以通過Dubbo RPC + Dubbo Filter 來設置和傳遞消費者的TraceID。

詳見官網對於這兩個概念的說明。

Dubbo RPC
Dubbo Filter

這邊我直接給出代碼和擴展點配置。

Dubbo Filter for Consumer

消費者應用端:

<code>/**
* @author Richard_yyf
*/
@Activate(group = {Constants.CONSUMER})
public class ConsumerRpcTraceFilter implements Filter {

@Override
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
//如果MDC上下文有追蹤ID,則原樣傳遞給provider端
String traceId = MDC.get(TRACE_ID);
if (StringUtils.isNotEmpty(traceId)) {
RpcContext.getContext().setAttachment(TRACE_ID, traceId);
}
return invoker.invoke(invocation);
}

}/<code>

SPI 配置

在resources目錄下,創建/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文件.

<code>consumerRpcTraceFilter=com.xxx.xxx.filter.ConsumerRpcTraceFilter/<code>


Java 服務調用全流程追蹤 簡易實現方案


Dubbo Filter for Provider

服務提供者應用端:

<code>/**
* @author Richard_yyf
*/
@Activate(group = {Constants.PROVIDER})
public class ProviderRpcTraceFilter implements Filter {

@Override
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
// 接收消費端的traceId

String traceId = RpcContext.getContext().getAttachment(TRACE_ID);
if (StringUtils.isBlank(traceId)) {
traceId = ThreadMdcUtil.createTraceId();
}

// 設置日誌traceId
ThreadMdcUtil.setTraceId(traceId);

// TODO 如果這個服務還會調用下一個服務,需要再次設置下游參數
// RpcContext.getContext().setAttachment("trace_id", traceId);

try {
return invoker.invoke(invocation);
} finally {
// 調用完成後移除MDC屬性
MDC.remove(TRACE_ID);
}
}

}/<code>

SPI 配置:

<code>providerRpcTraceFilter=com.xxx.xxx.filter.ProviderRpcTraceFilter/<code>

四、面向 HTTP Service 鏈路追蹤

除了Dubbo RPC 的這種方式,常見微服務之間的調用也有通過 HTTP REST 來完成調用的。這種場景下就需要在上游服務在發起HTTP調用的時候自動將 TraceID添加到 HTTP Header 中。

以常用的 Spring RestTemplate 為例,使用攔截器來包裝 HTTP Header。

<code>        RestTemplate restTemplate = new RestTemplate();

List<clienthttprequestinterceptor> list = new ArrayList<>();
list.add((request, body, execution) -> {
String traceId = MDC.get(TRACE_ID);

if (StringUtils.isNotEmpty(traceId)) {
request.getHeaders().add(TRACE_ID, traceId);
}
return execution.execute(request, body);
});

restTemplate.setInterceptors(list);/<clienthttprequestinterceptor>/<code>

下游服務由於是通過HTTP 接口暴露的服務,就添加一個攔截器來獲取就好。

<code>public class TraceInterceptor extends HandlerInterceptorAdapter {

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
MDC.clear();
String traceId = request.getHeader(TRACE_ID);
if (StringUtils.isEmpty(traceId)) {
ThreadMdcUtil.setTraceId();
} else {
MDC.put(TRACE_ID, traceId);
}
return true;
}
}/<code>

五、思考 Spring Cloud Sleuth 的實現

經過上面的幾個步驟,我們相當於是自己形成了一個比較基礎的服務追蹤的解決方案。

Spring Cloud 作為一個一站式 微服務開發框架,提供了Spring Cloud Sleuth 作為 該技術體系下分佈式跟蹤的解決方案。這裡我想拿出來講一講。

Sleuth 是一個成熟的技術解決方案,基於 Google Dapper 為理論基礎實現,裡面的一些術語都來自於那篇論文。在對於TraceID傳遞的問題上,我們上面講的簡單版的解決方案的一些解決問題的思路,實際上在Sleuth 中也有體現。

首先就是分佈式追蹤,Sleuth 會將 SpanID 和 TraceID添加到 Slf4J MDC 中,這樣在打印出來的日誌就會有帶上對應的標識。

在遇到線程池 TraceID 傳遞失效的問題時,我們相當了對提交任務的操作進行包裝,而在Slueth 中,是通過實現HystrixConcurrencyStrategy接口來解決 TraceID異步傳遞的問題。Hystrix在實際調用時,會調用HystrixConcurrencyStrategy的 wrapCallable 方法。通過實現這個接口,在wrapCallable 中將TraceID存放起來(具體參見SleuthHystrixConcurrencyStrategy)。

在面對Dubbo RPC 的調用方式和 Http Service 的調用方式中,我們通過Dubbo RpcContext + Filter和 Http Header + Interceptor 的方式,通過協議或者框架本身提供的擴展點和上下文機制,來傳遞TraceID。而在 Spring Cloud Sleuth中追蹤@Async,RestTemplate,Zuul,Feign等組件時,也是類似的解決思路。比如追蹤RestTemplate就是和上文一樣借用了Spring Client的 Interceptor 機制 (@See TraceRestTemplateInterceptor)。

上述就是將我們的簡單解決方案和 Spring Cloud Sleuth 的對比,想說明日誌追蹤的思想和一些技術解決思路是共通相近的。

當然,Spring Cloud Sleuth 基於 Dapper 實現,提供了一個比較成熟的分佈式系統調用追蹤架構,集成ZipKin + spring-cloud-sleuth-zipkin 依賴之後,能夠搭建一個完整的具有數據收集、數據存儲和數據展示功能的分佈式服務追蹤系統。

通過Sleuth可以很清楚的瞭解到一個服務請求經過了哪些服務,每個服務處理花費了多長。從而讓我們可以很方便的理清各微服務間的調用關係。此外Sleuth可以幫助我們:

  • 耗時分析: 通過Sleuth可以很方便的瞭解到每個採樣請求的耗時,從而分析出哪些服務調用比較耗時;
  • 可視化錯誤: 對於程序未捕捉的異常,可以通過集成Zipkin服務界面上看到;
  • 鏈路優化: 對於調用比較頻繁的服務,可以針對這些服務實施一些優化措施。

PS:spring-cloud-sleth 2.0 中開始 正式支持 Dubbo,思路的話則是通過Dubbo filter 擴展機制。

小結

再講講為什麼不引入Sleuth + ZipKin 這種解決方案呢?因為我們系統的調用鏈路並不複雜,一般只有一層的調用關係,所以並不希望增加第三方的組件,更傾向於使用簡單的解決方案。

本篇文章到這裡就結束了。實現一個簡單的微服務調用追蹤的日誌方案並沒有太大的難度,重要的還是解決問題的思路,並且觸類旁通,去學習一些市面上的已存在的優秀技術解決方案。

如果本文有幫助到你,希望能點個贊,這是對我的最大動力。



分享到:


相關文章: