怎么使用SpringCloud负载均衡实现定向路由

这篇文章主要讲解了“怎么使用SpringCloud负载均衡实现定向路由”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么使用SpringCloud负载均衡实现定向路由”吧!

    背景

    随着微服务项目的迭代,可能一个服务会有多个实例,这时候就会涉及到负载均衡。然而我们在开发的时候,肯定希望只启动一个项目。然后调试的时候希望负载均衡把请求分配到我们正在开发测试的服务实例上面。

    实现方式

    基于ip

    这个很好理解,就是开发者本地正在运行的服务在nacos上面肯定显示你本机的ip;那么只要我得到开发者的ip就能够根据这个ip来过滤nacos上面的服务,达到定向路由的效果。

    基于nacos的元数据

    spring:
      cloud:
        nacos:
          discovery:
            metadata: 
              version: "mfine"

    在yaml中配置nacos元数据的version属性。前端在请求header中添加与其对应version属性,就可以实现服务过滤也就是定向路由。

    实现原理

    Gateway服务

    因为gateway底层的不同,所以其负载均衡也与普通服务的不同,因此要特殊处理。先看gateway中load balancer组件调用流程。

    首先在gateway中load balancer本身也是一个过滤器,所以流程如下。

    • ReactiveLoadBalancerClientFilter里面有个LoadBalancerClientFactory属性,通过这个工厂获取具体的负载均衡器

    • LoadBalancerClientFactory会载入LoadBalancerClientConfiguration配置

    • LoadBalancerClientConfiguration会初始化我们需要的RoundRobinLoadBalancer,并且会通过构造函数传入LoadBalancerClientFactory对象。

    那我们要做什么呢?其实就是截胡。

    • 实现自己的LoadBalancerClientFactory,传入自己LoadBalancerClientConfiguration

    • 在自己的LoadBalancerClientConfiguration初始化自己的RoundRobinLoadBalancer

    • 最后在自己的ReactiveLoadBalancerClientFilter里面传入自己的LoadBalancerClientFactory,获得自己的负载均衡器。

    具体源码(只放核心)

    MyRoundRobinLoadBalancer

    private Response<ServiceInstance> getInstanceResponse(
        List<ServiceInstance> instances, ServerWebExchange exchange) {
        if (instances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        try {
            //可重入锁
            if (this.lock.tryLock(10, TimeUnit.SECONDS))
                instances = this.filterServiceInstance(exchange, instances);
            // TODO: enforce order?
            int pos = Math.abs(this.position.incrementAndGet());
            ServiceInstance instance = instances.get(pos % instances.size());
            return new DefaultResponse(instance);
        } catch (InterruptedException e) {
            throw new RuntimeException("自定义负载均衡器,超时等待异常");
        } finally {
            lock.unlock();
        }
    }
    // 根据附加信息过滤服务
    private List<ServiceInstance> filterServiceInstance(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) {
    
        List<ServiceInstance> filteredServices = new ArrayList<>();
        // 自动模式
        if (DevConfigEnum.AUTO.getCode().equals(this.properties.getModel())) {
            filteredServices = autoModel(exchange, serviceInstances);
        }
        // ip 模式
        if (this.properties.getModel().equals(DevConfigEnum.IP.getCode())) {
            filteredServices = ipModel(exchange, serviceInstances);
        }
        // metadata 模式
        if (this.properties.getModel().equals(DevConfigEnum.METADATA.getCode())) {
            filteredServices = metadataModel(exchange, serviceInstances);
        }
        if (filteredServices.isEmpty()) {
            log.info("未发现符合ip或metadata.version服务,将采用原始服务集合");
            return serviceInstances;
        }
        return filteredServices;
    }
    // 自动模式
    private List<ServiceInstance> autoModel(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) {
        List<ServiceInstance> filteredServices;
    
        filteredServices = ipModel(exchange, serviceInstances);
        if (filteredServices.isEmpty()) {
            filteredServices = metadataModel(exchange, serviceInstances);
        }
        return filteredServices;
    }
    //元数据模式
    private List<ServiceInstance> metadataModel(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) {
        String version = exchange.getRequest().getHeaders().getFirst("version");
        List<ServiceInstance> filteredServices = new ArrayList<>();
        if (version != null) {
            log.info("version模式:获取metadata.version成功");
            filteredServices = serviceInstances.stream().filter(instance -> {
                String metaVersion = instance.getMetadata().get("version");
                if (metaVersion == null) {
                    return false;
                }
                return metaVersion.equals(version);
            }).collect(Collectors.toList());
        }
        return filteredServices;
    }
    // ip模式
    private List<ServiceInstance> ipModel(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) {
        List<ServiceInstance> filteredServices = new ArrayList<>();
        try {
            String ipAddress = exchange.getRequest().getHeaders().getFirst("ip");
            if (ipAddress == null) {
                ipAddress = IPUtils.getIpAddress(exchange.getRequest());
            }
            log.warn("ip模式:获取ip成功");
            String finalIpAddress = ipAddress;
            filteredServices = serviceInstances.stream().filter(item -> item.getHost().equals(finalIpAddress))
                .collect(Collectors.toList());
        } catch (UnknownHostException e) {
            log.warn("ip模式:获取ip失败,无法进行定向路由");
        }
        return filteredServices;
    }

    MyLoadBalancerClientFactory

    public class MyLoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
          implements ReactiveLoadBalancer.Factory<ServiceInstance>{
    
        ................
    
        public MyLoadBalancerClientFactory() {
            // 传入自己的自动配置
            super(MyLoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
        }
    
        ...........
    }

    MyLoadBalancerClientConfiguration

    @Configuration
    public class MyLoadBalancerClientConfiguration {
        @Autowired
        private MicroServiceDevConfigProperties microServiceDevConfigProperties;
        @Bean
        public ReactorServiceInstanceLoadBalancer reactiveLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
            String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
            // 初始化自己的负载均衡器
            return new MyRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name, 1000,microServiceDevConfigProperties);
        }
    }
    @Configuration
    public class MyReactiveLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter {
        private static final Log log = LogFactory
                .getLog(ReactiveLoadBalancerClientFilter.class);
        private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
        private final MyLoadBalancerClientFactory clientFactory;
        private LoadBalancerProperties properties;
        // 注入自己的LoadBalancerClientFactory
        public MyReactiveLoadBalancerClientFilter(MyLoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
            super(null, null);
            this.clientFactory = clientFactory;
            this.properties = properties;
        }
        ........
        private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
            URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
            //获得自己load balancer
            MyRoundRobinLoadBalancer loadBalancer = this.clientFactory
                    .getInstance(uri.getHost(), MyRoundRobinLoadBalancer.class);
            if (loadBalancer == null) {
                throw new NotFoundException("No loadbalancer available for " + uri.getHost());
            }
            // 在自己的load balancer里面扩展choose方法,使其接受ServerWebExchange参数
            // 传入ServerWebExchange我们就可以,获取请求信息,方便我们过滤
            return loadBalancer.choose(exchange);
        }
    }

    我的这种实现较为繁琐,可能大家有更好方式,大家要是有更好更简单的方法,也可以直接替换掉。

    普通服务

    普通服务实现自定义负载均衡器就很简单了,实现自定义RoundRobinRule就可以了

    @Configuration
    public class MyRoundRobinLoadBalancer extends RoundRobinRule {
        //不同的使用注入的方式获取请求信息
        @Autowired
        private HttpServletRequest request;
        .....
            private List<Server> filterServers(List<Server> reachableServers) {
            List<Server> servers = new ArrayList<>();
            if (this.properties.getModel().equals(DevConfigEnum.AUTO.getCode())) {
                servers = ipModel(reachableServers);
                if (servers.isEmpty()) {
                    servers = metadataModel(reachableServers);
                }
            }
            if (this.properties.getModel().equals(DevConfigEnum.IP.getCode())) {
                servers = ipModel(reachableServers);
            }
            if (this.properties.getModel().equals(DevConfigEnum.METADATA.getCode())) {
                servers = metadataModel(reachableServers);
            }
            if (servers.isEmpty()) {
                return reachableServers;
            }
            return servers;
        }
    
        private List<Server> metadataModel(List<Server> reachableServers) {
            String version = request.getHeader("version");
            List<Server> servers = new ArrayList<>();
            if (version != null) {
                log.info("metadata模式: 获取version成功");
                servers = reachableServers.stream().filter(item -> {
                    NacosServer nacosServer = (NacosServer) item;
                    String metaVersion = nacosServer.getMetadata().get("version");
                    if (metaVersion == null) {
                        return false;
                    }
                    return metaVersion.equals(version);
                }).collect(Collectors.toList());
            } else {
                log.warn("metadata模式: header中无version字段且未获取到请求者ip");
            }
            return servers;
        }
    
        private List<Server> ipModel(List<Server> reachableServers) {
            List<Server> servers = new ArrayList<>();
            try {
                String ip = this.request.getHeader("ip");
                if (ip == null) {
                    ip = IPUtils.getIpAddress(request);
                }
                String finalIp = ip;
                servers = reachableServers.stream().filter(item -> item.getHost().equals(finalIp)).collect(Collectors.toList());
                log.info("ip模式: 获取请求者ip成功");
            } catch (UnknownHostException e) {
                log.warn("ip模式: 获取ip失败");
            }
            return servers;
        }
        ........
    }

    深入思考一下,通过注入的方式获取request信息是否存在多线程安全问题呢?

    使用方法

    metadata模式

    配置yaml:

    spring:
      application:
        name: cloud-order
      cloud:
        nacos:
          discovery:
            metadata:
            // 重点
              version: mfine
    celi-dev:
      config:
        model: "metadata"

    nacos中服务元数据

    然后请求头中附带version信息

    自定义负载均衡器会通过请求头中的version去nacos中注册服务的元数据里面去比对version信息。

    ip模式

    配置yaml

    celi-dev:
      config:
        model: "ip"
    • 在header中指定IP

    • 依靠请求信息获取ip

    配置yaml就好

    此不指定ip的时候,后台获取的ip可能不对。取决你本地是否存在多张网卡(虚拟网卡也算),有时候nacos中ip显示也会是虚拟网卡的ip。

    使用前请确认你的服务在nacos中的ip是多少,然后在header中指定ip,这样最省事也最稳妥。

    一般是先从header中获取ip信息,获取不到再从request对象中分析。

    auto模式

    配置yaml,其实可以不配置。

    celi-dev:
      config:
        model: "auto"

    自动模式默认先使用ip模式获取不到ip会自动切换metadata模式。

    什么都不配置,默认auto模式

    感谢各位的阅读,以上就是“怎么使用SpringCloud负载均衡实现定向路由”的内容了,经过本文的学习后,相信大家对怎么使用SpringCloud负载均衡实现定向路由这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是蜗牛博客,小编将为大家推送更多相关知识点的文章,欢迎关注!

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    评论

    有免费节点资源,我们会通知你!加入纸飞机订阅群

    ×
    天气预报查看日历分享网页手机扫码留言评论电报频道链接