03.07 聊聊skywalking的spring-cloud-gateway-plugin

本文主要研究一下skywalking的spring-cloud-gateway-plugin


聊聊skywalking的spring-cloud-gateway-plugin


NettyRoutingFilterInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/NettyRoutingFilterInstrumentation.java

<code>public class NettyRoutingFilterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[0];
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[]{
          new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named("filter").and(takesArgumentWithType(0, "org.springframework.web.server.ServerWebExchange"));
              }

              @Override
              public String getMethodsInterceptor() {
                  return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor";
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }

  @Override
  public ClassMatch enhanceClass() {
      return byName("org.springframework.cloud.gateway.filter.NettyRoutingFilter");
  }

  @Override
  protected final String[] witnessClasses() {
      return new String[]{"org.springframework.cloud.gateway.handler.FilteringWebHandler", "reactor.netty.http.client.HttpClientOperations"};
  }
}/<methoddescription>/<code>
  • NettyRoutingFilterInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor攔截org.springframework.cloud.gateway.filter.NettyRoutingFilter帶有org.springframework.web.server.ServerWebExchange參數的filter方法

NettyRoutingFilterInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/NettyRoutingFilterInterceptor.java

<code>public class NettyRoutingFilterInterceptor implements InstanceMethodsAroundInterceptor {


  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {
      EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);
      if (instance != null) {
          SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();
          ContextManager.getRuntimeContext().put(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER, swTransmitter);
      }
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                            Class>[] argumentsTypes, Object ret) throws Throwable {
      if (ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER) != null) {
          ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);
      }
      return ret;
  }


  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {
  }

  public static EnhancedInstance getInstance(Object o) {
      EnhancedInstance instance = null;
      if (o instanceof ServerWebExchangeDecorator) {
          instance = getEnhancedInstance((ServerWebExchangeDecorator) o);
      } else if (o instanceof DefaultServerWebExchange) {
          instance = (EnhancedInstance) o;
      }
      return instance;
  }


  private static EnhancedInstance getEnhancedInstance(ServerWebExchangeDecorator serverWebExchangeDecorator) {
      Object o = serverWebExchangeDecorator.getDelegate();
      if (o instanceof ServerWebExchangeDecorator) {

          return getEnhancedInstance((ServerWebExchangeDecorator) o);
      } else if (o instanceof DefaultServerWebExchange) {
          return (EnhancedInstance) o;
      } else if (o == null) {
          throw new NullPointerException("The expected class DefaultServerWebExchange is null");
      } else {
          throw new RuntimeException("Unknown parameter types:" + o.getClass());
      }
  }
}/<code>
  • NettyRoutingFilterInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法獲取swTransmitter,然後以名為Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER的key放入到ContextManager.getRuntimeContext();其afterMethod方法執行ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER)

HttpClientOperationsInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/HttpClientOperationsInstrumentation.java

<code>public class HttpClientOperationsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[0];
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[]{
          new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named("headers").and(takesArgumentWithType(0, "io.netty.handler.codec.http.HttpHeaders"));
              }

              @Override
              public String getMethodsInterceptor() {
                  return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor";
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          },new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named("send").and(takesArgumentWithType(0, "org.reactivestreams.Publisher"));
              }


              @Override
              public String getMethodsInterceptor() {
                  return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor";
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          },
          new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named("status");
              }

              @Override
              public String getMethodsInterceptor() {
                  return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor";
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          },
      };
  }

  @Override
  public ClassMatch enhanceClass() {
      return byName("reactor.netty.http.client.HttpClientOperations");
  }
}/<methoddescription>/<methoddescription>/<methoddescription>/<code>
  • HttpClientOperationsInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine
  • 它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor攔截reactor.netty.http.client.HttpClientOperations的帶有io.netty.handler.codec.http.HttpHeaders參數的headers方法
  • 它還使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor攔截reactor.netty.http.client.HttpClientOperations的帶有org.reactivestreams.Publisher參數的send方法
  • 它還使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor攔截reactor.netty.http.client.HttpClientOperations的status方法

HttpClientOperationsHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsHeadersInterceptor.java

<code>public class HttpClientOperationsHeadersInterceptor implements InstanceMethodsAroundInterceptor {

  private static final ILog logger = LogManager.getLogger(HttpClientOperationsHeadersInterceptor.class);

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                            Class>[] argumentsTypes, Object ret) throws Throwable {
      Object transmitter = ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField();
      if (transmitter != null) {
          objInst.setSkyWalkingDynamicField(transmitter);
          ((EnhancedInstance) allArguments[0]).setSkyWalkingDynamicField(null);
      }
      return ret;
  }


  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {
  }
}/<code>
  • HttpClientOperationsHeadersInterceptor實現了InstanceMethodsAroundInterceptor接口,其afterMethod方法執行objInst.setSkyWalkingDynamicField(transmitter)

HttpClientOperationsSendInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsSendInterceptor.java

<code>public class HttpClientOperationsSendInterceptor implements InstanceMethodsAroundInterceptor {

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {
      SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();
      if (transmitter != null) {
          HttpClientRequest request = (HttpClientRequest) objInst;

          HttpHeaders header = request.requestHeaders();
          ChannelOperations channelOpt = (ChannelOperations) objInst;

          InetSocketAddress remote = (InetSocketAddress) (channelOpt.channel().remoteAddress());
          String peer = remote.getHostName() + ":" + remote.getPort();

          AbstractSpan span = ContextManager.createExitSpan(transmitter.getOperationName(), peer);
          ContextManager.continued(transmitter.getSnapshot());
          ContextCarrier contextCarrier = new ContextCarrier();
          ContextManager.inject(contextCarrier);

          span.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);
          Tags.URL.set(span, peer + request.uri());
          Tags.HTTP.METHOD.set(span, request.method().name());
          SpanLayer.asHttp(span);

          CarrierItem next = contextCarrier.items();
          while (next.hasNext()) {
              next = next.next();
              header.set(next.getHeadKey(), next.getHeadValue());
          }
          transmitter.setSpanGateway(span.prepareForAsync());
          ContextManager.stopSpan(span);
      }
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                            Class>[] argumentsTypes, Object ret) throws Throwable {
      return ret;
  }


  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {
      ContextManager.activeSpan().errorOccurred().log(t);
  }
}/<code>
  • HttpClientOperationsSendInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法獲取request header、uri、method等信息設置到span中,最後執行ContextManager.stopSpan(span);其handleMethodException方法執行ContextManager.activeSpan().errorOccurred().log(t)

HttpClientOperationsStatusInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsStatusInterceptor.java

<code>public class HttpClientOperationsStatusInterceptor implements InstanceMethodsAroundInterceptor {

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            Object ret) throws Throwable {

      SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();
      if (transmitter != null) {
          HttpResponseStatus response = (HttpResponseStatus) ret;
          if (response.code() >= 400) {
              Tags.STATUS_CODE.set(transmitter.getSpanGateway().errorOccurred(), String.valueOf(response.code()));
          }
          transmitter.getSpanGateway().asyncFinish();
          objInst.setSkyWalkingDynamicField(null);
      }
      return ret;
  }

  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {
      ContextManager.activeSpan().errorOccurred().log(t);
  }
}/<code>
  • HttpClientOperationsStatusInterceptor實現了InstanceMethodsAroundInterceptor接口,其afterMethod方法獲取transmitter,在response.code()大於等於400時設置statusCode的tag,然後執行transmitter.getSpanGateway().asyncFinish();其handleMethodException執行ContextManager.activeSpan().errorOccurred().log(t)

FilteringWebHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/FilteringWebHandlerInstrumentation.java

<code>public class FilteringWebHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[0];
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[]{
          new InstanceMethodsInterceptPoint() {

              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named("handle");
              }

              @Override
              public String getMethodsInterceptor() {
                  return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor";
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }

  @Override
  public ClassMatch enhanceClass() {
      return byName("org.springframework.cloud.gateway.handler.FilteringWebHandler");
  }

}/<methoddescription>/<code>
  • FilteringWebHandlerInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor攔截org.springframework.cloud.gateway.handler.FilteringWebHandler的handle方法

FilteringWebHandlerInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/FilteringWebHandlerInterceptor.java

<code>public class FilteringWebHandlerInterceptor implements InstanceMethodsAroundInterceptor {

  private static final String SPRING_CLOUD_GATEWAY_ROUTE_PREFIX = "GATEWAY/";

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {
      EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);
      if (instance == null) {
          return;
      }
      ContextSnapshot contextSnapshot = (ContextSnapshot) instance.getSkyWalkingDynamicField();
      if (contextSnapshot == null) {
          return;
      }

      ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
      String operationName = SPRING_CLOUD_GATEWAY_ROUTE_PREFIX;
      Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
      operationName = operationName + route.getId();
      SWTransmitter transmitter = new SWTransmitter(contextSnapshot, operationName);
      instance.setSkyWalkingDynamicField(transmitter);
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                            Class>[] argumentsTypes, Object ret) throws Throwable {
      EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);
      if (instance == null) {
          return ret;
      }
      SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();
      if (swTransmitter == null) {
          return ret;
      }
      Mono<void> mono = (Mono) ret;
      return mono.doFinally(d -> {
          ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
          HttpStatus statusCode = exchange.getResponse().getStatusCode();
          if (statusCode == HttpStatus.TOO_MANY_REQUESTS) {
              AbstractSpan localSpan = ContextManager.createLocalSpan(swTransmitter.getOperationName());
              Tags.STATUS_CODE.set(localSpan,statusCode.toString());
              SpanLayer.asHttp(localSpan);
              localSpan.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);
              ContextManager.continued(swTransmitter.getSnapshot());
              ContextManager.stopSpan(localSpan);
          }
      });
  }


  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {
  }

}/<void>/<code>
  • FilteringWebHandlerInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法從exchange中獲取route信息最後形成operationName創建SWTransmitter並執行instance.setSkyWalkingDynamicField(transmitter);其afterMethod方法獲取SWTransmitter,然後註冊mono的doFinally回調,在裡頭獲取statusCode更細span,然後執行ContextManager.continued(swTransmitter.getSnapshot())及ContextManager.stopSpan(localSpan)

DefaultHttpHeadersInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/DefaultHttpHeadersInstrumentation.java

<code>public class DefaultHttpHeadersInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[] {
          new ConstructorInterceptPoint() {
              @Override public ElementMatcher<methoddescription> getConstructorMatcher() {
                  return takesArgumentWithType(0, "io.netty.handler.codec.DefaultHeaders");
              }

              @Override public String getConstructorInterceptor() {
                  return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor";
              }
          }
      };
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[0];
  }

  @Override
  public ClassMatch enhanceClass() {
      return byName("io.netty.handler.codec.http.DefaultHttpHeaders");
  }
}/<methoddescription>/<code>
  • DefaultHttpHeadersInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor攔截io.netty.handler.codec.http.DefaultHttpHeaders的第一個參數為io.netty.handler.codec.DefaultHeaders的方法

DefaultHttpHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/DefaultHttpHeadersInterceptor.java

<code>public class DefaultHttpHeadersInterceptor implements InstanceConstructorInterceptor {
  @Override
  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
      Object transmitter = ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);
      if (transmitter != null) {
          objInst.setSkyWalkingDynamicField(transmitter);
          ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);
      }
  }

}/<code>
  • DefaultHttpHeadersInterceptor實現了InstanceConstructorInterceptor接口,其onConstruct方法主要是將ContextManager.getRuntimeContext()中的transmitter設置到objInst中,然後從ContextManager.getRuntimeContext()移除該transmitter

小結

skywalking的spring-cloud-gateway-plugin主要有四個instrument,分別是NettyRoutingFilterInstrumentation、HttpClientOperationsInstrumentation、FilteringWebHandlerInstrumentation、DefaultHttpHeadersInstrumentation

doc

  • NettyRoutingFilterInstrumentation
  • HttpClientOperationsInstrumentation
  • FilteringWebHandlerInstrumentation
  • DefaultHttpHeadersInstrumentation


分享到:


相關文章: