分佈式系統中如何優雅地追蹤日誌(原理篇)

分佈式系統中日誌追蹤需要考慮的幾個點?

  1. 需要一個全服務唯一的id,即traceId,如何保證?
  2. traceId如何在服務間傳遞?
  3. traceId如何在服務內部傳遞?
  4. traceId如何在多線程中傳遞?

我們一一來解答:

  1. 全服務唯一的traceId,可以使用uuid生成,正常來說不會出現重複的;
  2. 關於服務間傳遞,對於調用者,在協議頭加上traceId,對於被調用者,通過前置攔截器或者過濾器統一攔截;
  3. 關於服務內部傳遞,可以使用ThreadLocal傳遞traceId,一處放置,隨處可用;
  4. 關於多線程傳遞,分為兩種情況:子線程,可以使用InheritableThreadLocal線程池,需要改造線程池對提交的任務進行包裝,把提交者的traceId包裝到任務中
分佈式系統中如何優雅地追蹤日誌(原理篇)

比如,上面這個系統,系統入口在A處,A調用B的服務,B裡面又起了一個線程B1去訪問D的服務,B本身又去訪問C服務。

我們就可以這麼來跟蹤日誌:

  1. 所有服務都需要一個全局的InheritableThreadLocal保存服務內部traceId的傳遞;
  2. 所有服務都需要一個前置攔截器或者過濾器,檢測如果請求頭沒有traceId就生成一個,如果有就取出來,並把traceId放到全局的InheritableThreadLocal裡面;
  3. 一個服務調用另一個服務的時候把traceId塞到請求頭裡,比如http header,本文來源於工從號彤哥讀源碼;
  4. 改造線程池,在提交的時候包裝任務,這個工作量比較大,因為服務內部可能依賴其它框架,這些框架的線程池有可能也需要修改;

實現

我們模擬A到B這兩個服務來實現一個日誌跟蹤系統。

為了簡單起見,我們使用SpringBoot,它默認使用的日誌框架是logback,而且Slf4j提供了一個包裝了InheritableThreadLocal的類叫MDC,我們只要把traceId放在MDC中,打印日誌的時候統一打印就可以了,不用顯式地打印traceId。

我們分成三個模塊:

  1. 公共包:封裝攔截器,traceId的生成,服務內傳遞,請求頭的傳遞等;
  2. A服務:只依賴於公共包,並提供一個接口接收外部請求;
  3. B服務:依賴於公共包,並內部起一個線程池,用於發送B1->D的請求,當然我們這裡不發送請求,只在線程池中簡單地打印一條日誌;

公共包

  1. TraceFilter.java

前置過濾器,用攔截器實現也是一樣的。

從請求頭中獲取traceId,如果不存在就生成一個,並放入MDC中。

<code>@Slf4j
@WebFilter("/**")
@Component
public class TraceFilter implements Filter {

@Override
public void init(FilterConfig filterConfig) throws ServletException {

}

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;


// 從請求頭中獲取traceId
String traceId = request.getHeader("traceId");
// 不存在就生成一個
if (traceId == null || "".equals(traceId)) {
traceId = UUID.randomUUID().toString();
}
// 放入MDC中,本文來源於工從號彤哥讀源碼
MDC.put("traceId", traceId);
chain.doFilter(servletRequest, servletResponse);
}

@Override
public void destroy() {

}
}
/<code>
  1. TraceThreadPoolExecutor.java

改造線程池,提交任務的時候進行包裝。

<code>public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

@Override
public void execute(Runnable command) {
// 提交者的本地變量

Map<string> contextMap = MDC.getCopyOfContextMap();
super.execute(()->{
if (contextMap != null) {
// 如果提交者有本地變量,任務執行之前放入當前任務所在的線程的本地變量中
MDC.setContextMap(contextMap);
}
try {
command.run();
} finally {
// 任務執行完,清除本地變量,以防對後續任務有影響
MDC.clear();
}
});
}
}
/<string>/<runnable>/<runnable>/<runnable>/<runnable>/<code>
  1. TraceAsyncConfigurer.java

改造Spring的異步線程池,包裝提交的任務。

<code>@Slf4j
@Component
public class TraceAsyncConfigurer implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-pool-");
executor.setTaskDecorator(new MdcTaskDecorator());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, params) -> log.error("asyc execute error, method={}, params={}", method.getName(), Arrays.toString(params));

}

public static class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<string> contextMap = MDC.getCopyOfContextMap();
return () -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}

}
/<string>/<code>
  1. HttpUtils.java

封裝Http工具類,把traceId加入頭中,帶到下一個服務。

<code>@Slf4j
public class HttpUtils {

public static String get(String url) throws URISyntaxException {
RestTemplate restTemplate = new RestTemplate();
MultiValueMap<string> headers = new HttpHeaders();
headers.add("traceId", MDC.get("traceId"));
URI uri = new URI(url);
RequestEntity requestEntity = new RequestEntity<>(headers, HttpMethod.GET, uri);
ResponseEntity<string> exchange = restTemplate.exchange(requestEntity, String.class);

if (exchange.getStatusCode().equals(HttpStatus.OK)) {
log.info("send http request success");
}

return exchange.getBody();
}

}

/<string>/<string>/<code>

A服務

A服務通過Http調用B服務。

<code>@Slf4j
@RestController
public class AController {

@RequestMapping("a")
public String a(String name) {
log.info("Hello, " + name);

try {
// A中調用B
return HttpUtils.get("http://localhost:8002/b");
} catch (Exception e) {
log.error("call b error", e);
}

return "fail";
}
}
/<code>

A服務的日誌輸出格式:

中間加了[%X{traceId}]一串表示輸出traceId。

<code># 本文來源於工從號彤哥讀源碼
logging:
pattern:
console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'
/<code>

B服務

B服務內部有兩種跨線程調用:

  • 利用Spring的異步線程池
  • 使用自己的線程池

BController.java

<code>@Slf4j
@RestController
public class BController {

@Autowired
private BService bService;

@RequestMapping("b")
public String b() {
log.info("Hello, b receive request from a");

bService.sendMsgBySpring();

bService.sendMsgByThreadPool();

return "ok";
}
}
/<code>

BService.java

<code>@Slf4j
@Service
public class BService {

public static final TraceThreadPoolExecutor threadPool = new TraceThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

@Async
public void sendMsgBySpring() {
log.info("send msg by spring success");
}

public void sendMsgByThreadPool() {
threadPool.execute(()->log.info("send msg by thread pool success"));
}
}
/<code>

B服務的日誌輸出格式:

中間加了[%X{traceId}]一串表示輸出traceId。

<code>logging:
pattern:
console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'
/<code>

測試

打開瀏覽器,輸入http://localhost:8001/a?name=andy。

A服務輸出日誌:

<code>2019-12-26 21:36:29.132  INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.a.AController             : Hello, andy
2019-12-26 21:36:35.380 INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.common.HttpUtils : send http request success
/<code>

B服務輸出日誌:

<code>2019-12-26 21:36:29.244  INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BController             : Hello, b receive request from a
2019-12-26 21:36:29.247 INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
2019-12-26 21:36:35.279 INFO 2368 --- [ async-pool-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService : send msg by spring success
2019-12-26 21:36:35.283 INFO 2368 --- [pool-1-thread-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService : send msg by thread pool success
/<code>

可以看到,A服務成功生成了traceId,並且傳遞給了B服務,且B服務線程間可以保證同一個請求的traceId是可以傳遞的。

文章來源:https://my.oschina.net/u/4108008/blog/3152201

關注我瞭解更多程序員資訊技術,領取豐富架構資料。


分享到:


相關文章: