Nmap 单例模式 OpenCV4 docker容器 npm search redis常用语句 grunt Way.js android实战项目 jquery触发点击事件 jquery清除子元素 html好看的字体样式 mysql合并结果集 python代码示例 python中re模块 java集合框架 java的正则表达式 javafor循环 java线程死锁 java绝对值 tmac修改器 金山wps2003 忧思华光玉 lol语音包 qq钱包实名认证 苹果双微信 vue路由跳转 cubase下载 粉碎文件工具 幽灵推 租房管理软件 iphone组装机 套料软件 如何查看端口是否开放 ps怎么复制粘贴 只狼月隐糖 只狼二段跳 战地4配置 topaz滤镜
当前位置: 首页 > 学习教程  > 编程语言

二十.SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter

2021/1/28 23:22:19 文章标签:

前言 经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是…

前言

经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》


回顾一下zuul的执行流程,Zuul的执行流程是这样的

  • 首先请求进来会先到达ZuulControllerZuulController把请求交给ZuulServlet去处理
  • ZuulServelt会调用 ZuulRunner 依次执行: init初始化,pre前置filter,route路由filter,post后置filter, 出现异常会执行 error 异常filter
  • ZuulRunner通过 FilterProcessor 去执行各种Filter,FilterProcessor通过 FilterLoader 加载 各种filters

今天的主角儿 RibbonRoutingFilter 是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,

RibbonRoutingFilter#run

我们直接看它的源码,见RibbonRoutingFilter#shouldFilter

	@Override
	public boolean shouldFilter() {
		RequestContext ctx = RequestContext.getCurrentContext();
		//上下文中必须有 serviceId 
		return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
				&& ctx.sendZuulResponse());
	}

这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run

public Object run() {
		RequestContext context = RequestContext.getCurrentContext();
		this.helper.addIgnoredHeaders();
		try {
			//构建一个Ribbon的命令上下文
			RibbonCommandContext commandContext = buildCommandContext(context);
			//执行Ribbon,发送请求
			ClientHttpResponse response = forward(commandContext);
			//设置结果
			setResponse(response);
			return response;
		}
		catch (ZuulException ex) {
			throw new ZuulRuntimeException(ex);
		}
		catch (Exception ex) {
			throw new ZuulRuntimeException(ex);
		}
	}
	//把响应结果设置到RequestContext上下文对象中
	protected void setResponse(ClientHttpResponse resp)
			throws ClientException, IOException {
			//把响应结果设置到RequestContext上下文对象中
		RequestContext.getCurrentContext().set("zuulResponse", resp);
		this.helper.setResponse(resp.getRawStatusCode(),
				resp.getBody() == null ? null : resp.getBody(), resp.getHeaders());
	}

RibbonCommandContext

在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

先看一下buildCommandContext方法是如何构建RibbonCommandContext

protected RibbonCommandContext buildCommandContext(RequestContext context) {
		HttpServletRequest request = context.getRequest();
		//获取请求头
		MultiValueMap<String, String> headers = this.helper
				.buildZuulRequestHeaders(request);
		//获取参数
		MultiValueMap<String, String> params = this.helper
				.buildZuulRequestQueryParams(request);
		//获取请求的方式		
		String verb = getVerb(request);
		//获取请求体中的内容
		InputStream requestEntity = getRequestBody(request);
		if (request.getContentLength() < 0 && !verb.equalsIgnoreCase("GET")) {
			context.setChunkedRequestBody();
		}
		//调用的服务ID
		String serviceId = (String) context.get(SERVICE_ID_KEY);
		//是否重试
		Boolean retryable = (Boolean) context.get(RETRYABLE_KEY);
		Object loadBalancerKey = context.get(LOAD_BALANCER_KEY);
		//请求的资源路径
		String uri = this.helper.buildZuulRequestURI(request);

		// remove double slashes
		uri = uri.replace("//", "/");
		//getContentLength 内容长度
		long contentLength = useServlet31 ? request.getContentLengthLong(): request.getContentLength();
		//创建一个 RibbonCommandContext
		return new RibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
				requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
	}

这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext ,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext 的源码

public class RibbonCommandContext {
	private final String serviceId;	//服务id
	private final String method;	//请求方式	
	private final String uri;		//请求资源URI
	private final Boolean retryable;	//重试
	private final MultiValueMap<String, String> headers; //请求头
	private final MultiValueMap<String, String> params;	//请求产妇
	private final List<RibbonRequestCustomizer> requestCustomizers;
	private InputStream requestEntity;	//请求体内容
	private Long contentLength;	//内容长度	
	private Object loadBalancerKey;	//负载均衡器

HttpClientRibbonCommand的创建

接着我们看一下 forward 方法是如何调用下游微服务的 RibbonRoutingFilter#forward

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
		Map<String, Object> info = this.helper.debug(context.getMethod(),
				context.getUri(), context.getHeaders(), context.getParams(),
				context.getRequestEntity());
		//使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand 
		RibbonCommand command = this.ribbonCommandFactory.create(context);
		try {
			//执行请求
			ClientHttpResponse response = command.execute();
			this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
			return response;
		}
		catch (HystrixRuntimeException ex) {
			return handleException(info, ex);
		}

	}

这里 使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand ,然后执行 RibbonCommand.execute ,我们看一下 ribbonCommandFactory.create


	@Override
	public HttpClientRibbonCommand create(final RibbonCommandContext context) {
		//获取服务对应的降级
		FallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
		final String serviceId = context.getServiceId();
		//获取Ribbon的负载均衡Http客户端
		final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
				serviceId, RibbonLoadBalancingHttpClient.class);
	
		//获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient 
		client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
		//创建一个 HttpClientRibbonCommand 
		return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
				clientFactory.getClientConfig(serviceId));
	}

HttpClientRibbonCommandFactory#create 方法主要是 把服务的降级FallbackProvider,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer 负载均衡器设置给 HttpClientRibbonCommand 对象并返回

HttpClientRibbonCommand 是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类
在这里插入图片描述

HttpClientRibbonCommand 的执行

代码回到 RibbonRoutingFilter#forward 方法,接下来分析 RibbonCommand.execute 具体执行流程

ClientHttpResponse response = command.execute();

进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute

   public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

进入qeueu方法

public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        // 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求
        final Future<R> delegate = toObservable().toBlocking().toFuture();
    	
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
              	...省略...
                return res;
			}

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
			}

            @Override
            public boolean isDone() {
                return delegate.isDone();
			}

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
        	
        };

        /* special handling of error states that throw immediately */
        if (f.isDone()) {
            try {
            	//获取结果
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
					case COMMAND_EXCEPTION:
					case TIMEOUT:
						// we don't throw these types from queue() only from queue().get() as they are execution errors
						return f;
					default:
						// these are errors we throw from queue() as they as rejection type errors
						throw hre;
					}
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }

这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture(); 这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run

@Override
	protected ClientHttpResponse run() throws Exception {
		final RequestContext context = RequestContext.getCurrentContext();
		//创建请求,实现类是 RibbonApacheHttpRequest
		RQ request = createRequest();
		RS response;
		//可重试的客户端 
		boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
				&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
		
		if (retryableClient) {
			response = this.client.execute(request, config);
		} else {
			//默认走这里使用负载均衡器执行
			response = this.client.executeWithLoadBalancer(request, config);
		}
		//把结果设置到RequestContext 上下文
		context.set("ribbonResponse", response);

		// Explicitly close the HttpResponse if the Hystrix command timed out to
		// release the underlying HTTP connection held by the response.
		//响应超时,关闭response
		if (this.isResponseTimedOut()) {
			if (response != null) {
				response.close();
			}
		}
		//返回结果
		return new RibbonHttpResponse(response);
	}

RibbonLoadBalancingHttpClient Http客户端

关键代码在 this.client.execute(request, config); 这里的 client是一个 RibbonLoadBalancingHttpClient ,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer

 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
        	//1.调用 LoadBalancerCommand的submit执行请求
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                    	//使用服务器重构URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                        //2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }

方法中有两个重要的代码

  • command.submit : 调用 LoadBalancerCommand的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
  • AbstractLoadBalancerAwareClient.this.execute:执行具体的服务

LoadBalancerCommand.submit 提交请求

先看LoadBalancerCommand.submit

 public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        //这里在做负载均衡 selectServer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    //为每个选定的服务器调用
                    public Observable<T> call(Server server) {
   ...省略...

很关键的一行代码 server == null ? selectServer() : Observable.just(server))它在使用负载均衡选择服务,跟一下 selectServer方法

    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                //负载均衡
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

继续跟 loadBalancerContext.getServerFromLoadBalancer 方法

 public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
            	//通过 ILoadBalancer的chooseServer方法选择服务
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
                ...省略...

这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。

RibbonLoadBalancingHttpClient Http客户端执行请求

选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute , 该方法在执行具体的服务了

	@Override
	public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,
											final IClientConfig configOverride) throws Exception {
		IClientConfig config = configOverride != null ? configOverride : this.config;
		RibbonProperties ribbon = RibbonProperties.from(config);
		//封装请求配置,超时时间
		RequestConfig requestConfig = RequestConfig.custom()
				.setConnectTimeout(ribbon.connectTimeout(this.connectTimeout))
				.setSocketTimeout(ribbon.readTimeout(this.readTimeout))
				.setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects))
				.build();
		//创建请求对象
		request = getSecureRequest(request, configOverride);
		final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
		//委派给 InternalHttpClient 客户去执行
		final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
		//封装结果返回
		return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
	}

这里封装了请求配置,创建请求对象,然后交给apacheInternalHttpClient 去执行请求,最后把结果封装成RibbonApacheHttpResponse返回。

最后响应结果会在RibbonRoutingFilter中被设置到RequestContext 上下文对象中 ,会通过SendResponseFilter去处理把上下文对象中的响应结果写给客户端

到这里整理执行流程结束,我们来这里一下流程
在这里插入图片描述


本文链接: http://www.dtmao.cc/news_show_650195.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?