dtcms 源码 android开发实战 intellij idea汉化 Netty haskell 管理后台ui 品优购电商系统开发 java设计模式视频 jquery移除子元素 spark大数据处理技术 hash怎么下载 mysql组合索引 nodejs后端开发 axure时间选择控件 python环境搭建 python类和对象 python获取时间戳 java环境搭建 java运算 java中float java怎么配置环境变量 java包名 java接口实例 幽城幻剑录五内 r语言和python begininvoke mpg格式转换 sdm439 茸好珠 苹果手机不弹出信任 绘图软件下载 脚本大师 fastcgi 加速软件 omg小北 qq浏览器全屏 ae怎么复制图层 python游戏ps竖排文字 图层蒙版抠图 1000kbps
当前位置: 首页 > 学习教程  > 编程语言

Soul网关中的Http服务探活

2021/1/28 22:43:41 文章标签:

服务探活机制是为了发现系统中上下游服务的的状态。当有新的服务注册时要通知其他系统,当有服务下线时也要告知其他系统。 Soul网关中有对Http服务处理的探活,所有的服务对象保存在soul-admin的UPSTREAM_MAP中,这里面的服务对象有两个来源&a…

服务探活机制是为了发现系统中上下游服务的的状态。当有新的服务注册时要通知其他系统,当有服务下线时也要告知其他系统。

Soul网关中有对Http服务处理的探活,所有的服务对象保存在soul-adminUPSTREAM_MAP中,这里面的服务对象有两个来源,一个来自于原有的数据库,一个来自于其他服务的注册。

@Component
public class UpstreamCheckService {
	//保存上游服务
    private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP = Maps.newConcurrentMap();
   
    //省略了其他代码
}

UpstreamCheckService类中,在构造器执行完后,会执行setup()方法。里面做了两件事情:

  • 1.读取数据库的服务信息,保存到UPSTREAM_MAP
  • 2.开启定时任务,检查每个方法。
    //在构造器只执行完,执行这个方法
	@PostConstruct
    public void setup() {
        PluginDO pluginDO = pluginMapper.selectByName(PluginEnum.DIVIDE.getName());
        if (pluginDO != null) {
		  //省略了其他代码
            //来源数据库的服务对象
                if (CollectionUtils.isNotEmpty(divideUpstreams)) {
                    UPSTREAM_MAP.put(selectorDO.getName(), divideUpstreams);
                }
            }
        }
        //是否开启探活机制,默认开启
        if (check) {
            //定时任务,每10秒执行一次
            new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), SoulThreadFactory.create("scheduled-upstream-task", false))
                    .scheduleWithFixedDelay(this::scheduled, 10, scheduledTime, TimeUnit.SECONDS);
        }
    }

定时任务每10秒就去检查服务的状态,并且更新服务信息。

 private void scheduled() {
        if (UPSTREAM_MAP.size() > 0) {
            //检查每个选择器
            UPSTREAM_MAP.forEach(this::check);
        }
    }

    private void check(final String selectorName, final List<DivideUpstream> upstreamList) {
         //对每个服务进行检查
        for (DivideUpstream divideUpstream : upstreamList) {
            //检查服务
            final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());
            if (pass) {
			//	成功
            } else {
                //失败
                divideUpstream.setStatus(false);
                log.error("check the url={} is fail ", divideUpstream.getUpstreamUrl());
            }
        }
        //都存活
        if (successList.size() == upstreamList.size()) {
            return;
        }
        //部分存活,只保留存活的,去除失活的服务
        if (successList.size() > 0) {
            UPSTREAM_MAP.put(selectorName, successList);
            updateSelectorHandler(selectorName, successList);
        } else { //没有存活的
            UPSTREAM_MAP.remove(selectorName);
            updateSelectorHandler(selectorName, null);
        }
    }

检查过程是通过socket进行连接,能够连接成功,则服务是好的,否则就认为服务连接失败。

    private static boolean isHostConnector(final String host, final int port) {
        try (Socket socket = new Socket()) {
            //建立socket连接
            socket.connect(new InetSocketAddress(host, port));
        } catch (IOException e) {
            return false;
        }
        return true;
    }

有服务失活的时候,要发送更新事件给网关。

    private void updateSelectorHandler(final String selectorName, final List<DivideUpstream> upstreams) {
        SelectorDO selectorDO = selectorMapper.selectByName(selectorName);
        if (Objects.nonNull(selectorDO)) {
			//省略了其他代码
                // publish change event.
                eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
            }
        }
    }

当有新的服务启动时,SpringMvcClientBeanPostProcessor会处理接口信息,向soul-admin发起注册请求"/soul-client/springmvc-register"。当soul-admin端接收到这个请求时,会做三件事情:

  • 1.更新数据库信息;
  • 2.提交服务信息到UPSTREAM_MAP
  • 3.发布事件到网关。
//接收  /soul-client/springmvc-register 请求的实现接口
public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
        if (dto.isRegisterMetaData()) {
            MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
            if (Objects.isNull(exist)) {
                saveSpringMvcMetaData(dto);
            }
        }
        //处理请求
        String selectorId = handlerSpringMvcSelector(dto);
        handlerSpringMvcRule(selectorId, dto);
        return SoulResultMessage.SUCCESS;
   }

private String handlerSpringMvcSelector(final SpringMvcRegisterDTO dto) {
	//省略了其他代码
    
    // update db 更新数据库信息
    selectorMapper.updateSelective(selectorDO);
    // submit upstreamCheck 提交服务信息到UPSTREAM_MAP
    upstreamCheckService.submit(contextPath, addDivideUpstream);
    // publish change event. 发布事件到网关
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(selectorData)));

    }

上面的服务探活是在应用服务和soul-admin之间。其实,在网关也有一个UPSTREAM_MAP来保存服务信息。它里面的服务来源是soul-admin发布事件来通知网关的,网关会对信息进行处理,更新可用服务。

public final class UpstreamCacheManager {
		//保存服务信息
    private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP = Maps.newConcurrentMap();
}

在网关这一边,服务信息处理流程是(假设数据同步采用的是websocket):

  • SoulWebsocketClient: 后台 wesocket 信息在这里被监听,并发送给 WebsocketDataHandler 处理;
  • WebsocketDataHandler: 根据事件类型, 选择对应处理器 (PluginDataHandlerRuleDataHandler等);
  • AbstractDataHandler: 根据事件变动类型(refreshupdate等), 调用处理器对应方法, 具体实现类会调用到 CommonPluginDataSubscriber 订阅器;
  • CommonPluginDataSubscriber: 这里存有所有注册为 Bean 的事件处理器,处理主要逻辑在subscribeDataHandler()方法中;
  • DividePluginDataHandler: 更新或移除缓存管理器中服务节点信息。

另外,在网关的UpstreamCacheManager中,也有一个每隔30秒的定时任务scheduled去检查服务的状态信息,但是这个默认是关闭的。

public final class UpstreamCacheManager {   
    
    //省略了其他代码
    
    private UpstreamCacheManager() {
            boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));
            if (check) {
                new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false))
                        .scheduleWithFixedDelay(this::scheduled,
                                30, Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS);
            }
        }
}

小结,本篇文章介绍了在Soulsoul-adminsoul-bootstrap网关对服务的探活机制,主要实现类分别是UpstreamCheckServiceUpstreamCacheManager


本文链接: http://www.dtmao.cc/news_show_650069.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?