且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

分布式改造剧集1

更新时间:2022-08-12 19:14:28

背景介绍

​ 我所在的项目组,使用的技术一直是接近原始社会的:jdk1.6 + SpringMVC + hessian + Mybatis,当前最火的中间件技术Redis、MQ是完全没有用到,更别说什么分布式这种高大上的东西了。开始一直以为可能接下来的日子我都会在原始社会中渡过,直到我跳槽到另一家公司。

​ 事情总是在最不抱希望的时候出现转机,最近老大指派我牵头做分布式改造。作为技术痴的我,刚接到这个消息的时候别提有多激动了。虽然平时空闲时间会去学习最新的一些技术,但总觉得没怎么学进去。一是年纪大了,对于一个东西不用就忘;二是都只是敲一些demo代码,相信各位大神都知道,真正项目中遇到的技术难题永远比demo中的那些小问题复杂的多。

​ 废话不多说,先来说说这次分布式改造的预期:

  • 应用的分布式:这一点很容易理解,就是希望以前的单节点应用能够部署成多节点,一个请求可以转发到多个节点中的任意一个
  • 缓存的分布式:好在我们项目对缓存的依赖性不是特别高,项目中使用的缓存也大部分仅仅是为了提升效率。对于内存缓存(Ehcache),希望在不同节点间能够同步,对数据的实时性和一致性要求不是特别高
  • 锁的分布式:业务互斥其实一直是我们项目的一个复杂所在。因为是金融行业,一旦业务互斥没有做好,就会出现严重的资金风险。对锁的可靠性要求特别高,对于互斥的业务锁,只要一个节点能够拿到,其他节点一定不能拿到。项目以前的实现是直接通过内存中的一个ConcurrentHashMap去实现的。如果多节点部署的话,很显然每个节点都会存在一个内存锁,原来的锁将完全不起作用

​ 当然除了预期之外,考虑到部署环境的复杂性:一共几十套环境,后面可能上百。有的部署在腾讯云上,有的部署在客户自己内部系统中,一个微小的部署变动可能会被放大几十倍。所以领导除了给出预期之外,还给了以下两点要求:

  • 尽量不要升级JDK
  • 尽量不要引入新的中间件或者新的外部应用部署

​ 好了,背景暂且交代到这里。我们基本对这次分布式改造剧集有了了解,下面开始进入正片…...

第一章:纯DIY分布式

第一集: 应用分布式改造

探索之路

​ 对于当今的互联网企业码农,只要公司不处于原始社会。分析了上面的需求之后,对于这种请求多节点转发负载的,很自然地就会想到nginx。没错,我开始也想到了nginx,并且本地测试了,对于当前项目的转发只需下面简单的配置即可:

upstream apps {
    server 127.0.0.1:8380;
    server 127.0.0.1:8480;
    server 127.0.0.1:8580;
}

server {
    listen 9999;
    server_name 127.0.0.1:9999;
    #本机的hessian服务最终都发布成 */service/*.hs的形式
    location /service/ {
        proxy_pass http://apps/myapp-war/;
    }
}

​ 但是各位别忘了要求的第二点,好吧?忘了的话请再次回头看上面 。显然,引入nginx这种方式不行…...

​ 既然不能引入这种转发的应用,那么只有DIY一个调用分发的实现了。那么,究竟如何实现呢?永远不要忘记没有什么问题是看源码解决不了的,如果有,那么请debug源码。

​ hessian与Spring的集成,客户端最终注入到Spring容器中的bean类为org.springframework.remoting.caucho.HessianProxyFactoryBean,我们跟进这个类的源码,发现该类的上下级关系为:

HessianProxyFactoryBean extends HessianClientInterceptor implements FactoryBean<Object>

最终实际客户端的调用时通过HessianClientInterceptor类的invoke方法来实现的,该类的主要代码如下:

public class HessianClientInterceptor extends UrlBasedRemoteAccessor implements MethodInterceptor {

    private HessianProxyFactory proxyFactory = new HessianProxyFactory();

    private Object hessianProxy;
    
    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        // 类初始化完成之后会调用prepare方法,对hessianProxy进行初始化
        prepare();
    }

    /**
     * Initialize the Hessian proxy for this interceptor.
     * @throws RemoteLookupFailureException if the service URL is invalid
     */
    public void prepare() throws RemoteLookupFailureException {
        try {
            this.hessianProxy = createHessianProxy(this.proxyFactory);
        }
        catch (MalformedURLException ex) {
            throw new RemoteLookupFailureException("Service URL [" + getServiceUrl() + "] is invalid", ex);
        }
    }

    /**
     * Create the Hessian proxy that is wrapped by this interceptor.
     * @param proxyFactory the proxy factory to use
     * @return the Hessian proxy
     * @throws MalformedURLException if thrown by the proxy factory
     * @see com.caucho.hessian.client.HessianProxyFactory#create
     */
    protected Object createHessianProxy(HessianProxyFactory proxyFactory) throws MalformedURLException {
        Assert.notNull(getServiceInterface(), "'serviceInterface' is required");
        // 根据配置文件中的配置创建代理类
        return proxyFactory.create(getServiceInterface(), getServiceUrl(), getBeanClassLoader());
    }
    
    // 最终hessian调用时调用的方法
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        if (this.hessianProxy == null) {
            throw new IllegalStateException("HessianClientInterceptor is not properly initialized - " +
                    "invoke 'prepare' before attempting any operations");
        }

        ClassLoader originalClassLoader = overrideThreadContextClassLoader();
        try {
            // 这一句特别关键,最终是使用前面初始化过的hessianProxy的对应方法,最终的hessian地址也存在该对象中
            return invocation.getMethod().invoke(this.hessianProxy, invocation.getArguments());
        }
        catch (InvocationTargetException ex) {
            Throwable targetEx = ex.getTargetException();
            // Hessian 4.0 check: another layer of InvocationTargetException.
            if (targetEx instanceof InvocationTargetException) {
                targetEx = ((InvocationTargetException) targetEx).getTargetException();
            }
            if (targetEx instanceof HessianConnectionException) {
                throw convertHessianAccessException(targetEx);
            }
            else if (targetEx instanceof HessianException || targetEx instanceof HessianRuntimeException) {
                Throwable cause = targetEx.getCause();
                throw convertHessianAccessException(cause != null ? cause : targetEx);
            }
            else if (targetEx instanceof UndeclaredThrowableException) {
                UndeclaredThrowableException utex = (UndeclaredThrowableException) targetEx;
                throw convertHessianAccessException(utex.getUndeclaredThrowable());
            }
            else {
                throw targetEx;
            }
        }
        catch (Throwable ex) {
            throw new RemoteProxyFailureException(
                    "Failed to invoke Hessian proxy for remote service [" + getServiceUrl() + "]", ex);
        }
        finally {
            resetThreadContextClassLoader(originalClassLoader);
        }
    }

​ 仔细分析上面源码和我加的中文注释,不难发现解决问题的关键就在与在实际调用之前替换hessianProxy或者针对同一个hessianProxy替换其指向的url。即我们需要对原来注入到Spring容器中的org.springframework.remoting.caucho.HessianProxyFactoryBean类做定制,替换成我们的类,然后在调用之前动态替换hessianProxy。一种方式是对于需要路由的服务接口xml声明做替换:

<bean id="channelAccTaskServiceFacade" class="com.rampage.distribute.factory.DistributeHessainProxyFactoryBean">
        <!--hessian.rampage.server为分布式之前配置文件配置的固定服务器地址 -->
        <property name="serviceUrl" value="${hessian.rampage.server}/services/helloService.hs" />
        <property name="serviceInterface" value="com.rampage.demo.facade.cle.service.HelloService" />
    </bean>

​ 这种可能对代码的改动较大,而且如果不想实现路由的话又得替换回来。另外一种实现就像前面我写的服务定制一样,在Springbean定义加载完成初始化之前做拦截,对bean进行增强替换,这里我们采用第二种方式。因为这样更灵活,可以自定义替换规则。


实现Demo

​ 我这里给出我的一个实现demo,大致步骤分为如下几步:

  • 定义一个注解,用来添加到hessian服务接口上,表示该服务需要在客户端调用的时候进行分布式增强,分发到不同节点:
  @Target({ElementType.TYPE})
  @Retention(RetentionPolicy.RUNTIME)
  @Documented
  @Inherited
  public @interface Distribute {
  }
  • 定义一个BFP, 在bean初始化之前根据是否增强为分布式bean策略,来决定是否对hessian服务进行增强:
  // 定义一个BFP,在bean初始化之前进行增强
  @Component
  public class DistributeBeanFactoryPostProcessor implements BeanFactoryPostProcessor, Ordered {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(DistributeBeanFactoryPostProcessor.class);
    
    private DistributeStrategy distributeStrategy;
    
    private boolean openDistribute;
    
    {
        // 从配置文件加载是否开启分布式以及分布式分发策略
        Properties properties = new Properties();
        try {
            properties.load(DistributeBeanFactoryPostProcessor.class.getClassLoader().getResource("app-config.properties").openStream());
        } catch (IOException e) {
            LOGGER.error("加载classPath下配置文件【app-config.properties】失败!", e);
        }
        
        openDistribute = Libs.toBoolean(properties.getProperty("open.hessian.client.distribute", "false"), false);
        if (openDistribute) {
            Class<?> clazz = null;
            String strategyClassName = properties.getProperty("hessian.client.distribute.strategy", "com.ramapge.distribute.AnnotationDistributeStrategy");
            try {
                clazz = Class.forName(strategyClassName);
                distributeStrategy = (DistributeStrategy) clazz.newInstance();
            } catch (Exception e) {
                openDistribute = false;
                LOGGER.error("初始化分布式策略类失败!", e);
            }
        }
    }

    @Override
    public int getOrder() {
        return Integer.MAX_VALUE;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        // 未开启分布式策略,则直接返回
        if (!openDistribute) {
            LOGGER.error("未开启分布式分发策略, 跳过分布式BeanFactory后置处理......");
            return;
        }
        
        LOGGER.info("进入分布式策略BeanFactory后置处理, 分布式策略类为【{}】......", distributeStrategy.getClass().getName());
        String[] beanDefNames = beanFactory.getBeanDefinitionNames();
        if (ArrayUtils.isEmpty(beanDefNames)) {
            return;
        }
        
        BeanDefinition beanDef = null;
        // 替换hessian客户端的实现类为分布式hessian支持类
        for (String beanName : beanDefNames) {
            beanDef = beanFactory.getBeanDefinition(beanName);
            // 如果满足分布式分发策略,则替换hessian客户端工厂类 FIXME: 测试渠道日结分布式,后续删掉该判断条件
            if (distributeStrategy.doDistribute(beanFactory, beanDef)) {
    beanDef.setBeanClassName("com.rampage.distribute.DistributeHessainProxyFactoryBean");
            }
        }
    }
  }

  // 是否转变成分布式策略
  public interface DistributeStrategy {
    boolean doDistribute(ConfigurableListableBeanFactory beanFactory, BeanDefinition beanDef);
  }

  // 注解转发策略,这里是如果有对应的Distribute注解,则将其变成分布式调用
  public class AnnotationDistributeStrategy implements DistributeStrategy {
    @Override
    public boolean doDistribute(ConfigurableListableBeanFactory beanFactory, BeanDefinition beanDef) {
        if (!"org.springframework.remoting.caucho.HessianProxyFactoryBean".equals(beanDef.getBeanClassName())) {
            return false;
        }
        
        // 只分发有@Distribute注解的bean
        MutablePropertyValues pv = beanDef.getPropertyValues();
        if (!pv.contains("serviceInterface")) {
            return false;
        }
        
        TypedStringValue interfaceName = (TypedStringValue) pv.getPropertyValue("serviceInterface").getValue();
        try {
            Class<?> hessianInterface = Thread.currentThread().getContextClassLoader().loadClass(interfaceName.getValue());
            Distribute distribute = hessianInterface.getAnnotation(Distribute.class);
            if (distribute == null) {
                return false;
            }
        } catch (ClassNotFoundException e) {
            return false;
        }   
        return true;
    }

  }
  • 定制的、hessianProxyFactoryBean实现:
  /**
   * 分布式Hessian Bean工厂
   * @author ziyuqi
   *
   */
  public class DistributeHessainProxyFactoryBean extends HessianProxyFactoryBean {
    
    /**
     * 从hessian代理类列表
     */
    private List<Object> slaveHessianProxies = new ArrayList<Object>();
    
    /**
     * 主hessian代理类
     */
    private Object masterHessianProxy;
    
    private HessianProxyFactory proxyFactory;

    @Override
    protected Object createHessianProxy(HessianProxyFactory proxyFactory) throws MalformedURLException {
        // 将配置中的proxy设置为主代理类,并且返回null
        this.masterHessianProxy = super.createHessianProxy(proxyFactory);
        this.proxyFactory = proxyFactory;
        return null;
    }
    
    

    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        
        // TODO: 实现从节点可配置,动态读取当前配置信息进行转发
        // 初始化从Hessian代理列表 
        String masterServiceUrl = getServiceUrl();
        int suffixIndex = masterServiceUrl.lastIndexOf("/services/");
          // 配置文件中配置的http://127.0.0.1:8580/myapps-war作为主节点,这里demo写死两个从节点
        String[] slavePrefixes = new String[] {"http://127.0.0.1:8480/myapps-war", "http://127.0.0.1:8580/myapps-war"};
        
        for (String slavePrefix : slavePrefixes) {
            try {
                Object slaveHessianProxy = this.proxyFactory.create(getServiceInterface(), slavePrefix + masterServiceUrl.substring(suffixIndex), getBeanClassLoader());
                slaveHessianProxies.add(slaveHessianProxy);
            } catch (MalformedURLException e) {
                throw new RemoteLookupFailureException("Service URL [" + slavePrefix + getServiceUrl() + "] is invalid", e);

            }
        }
    }


    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        if (this.masterHessianProxy == null && this.slaveHessianProxies.isEmpty()) {
            throw new IllegalStateException("HessianClientInterceptor is not properly initialized - " +
                    "invoke 'prepare' before attempting any operations");
        }
        ClassLoader originalClassLoader = overrideThreadContextClassLoader();
        try {
            return invocation.getMethod().invoke(routeHessianProxy(invocation), invocation.getArguments());
        }
        catch (InvocationTargetException ex) {
            Throwable targetEx = ex.getTargetException();
            // Hessian 4.0 check: another layer of InvocationTargetException.
            if (targetEx instanceof InvocationTargetException) {
                targetEx = ((InvocationTargetException) targetEx).getTargetException();
            }
            if (targetEx instanceof HessianConnectionException) {
                throw convertHessianAccessException(targetEx);
            }
            else if (targetEx instanceof HessianException || targetEx instanceof HessianRuntimeException) {
                Throwable cause = targetEx.getCause();
                throw convertHessianAccessException(cause != null ? cause : targetEx);
            }
            else if (targetEx instanceof UndeclaredThrowableException) {
                UndeclaredThrowableException utex = (UndeclaredThrowableException) targetEx;
                throw convertHessianAccessException(utex.getUndeclaredThrowable());
            }
            else {
                throw targetEx;
            }
        }
        catch (Throwable ex) {
            throw new RemoteProxyFailureException(
                    "Failed to invoke Hessian proxy for remote service [" + getServiceUrl() + "]", ex);
        }
        finally {
            resetThreadContextClassLoader(originalClassLoader);
        }
    }

    /**
     * 路由hessian调用
     * @param invocation 方法调用对象
     * @return 路由hessian代理对象
     */
    private Object routeHessianProxy(MethodInvocation invocation) {
        if (this.slaveHessianProxies.isEmpty()) {
            return this.masterHessianProxy;
        }
        
        // TODO: 修改随机选取算法
        int totalCount = this.slaveHessianProxies.size() + 1;
        int nextIndex = (new Random()).nextInt(totalCount);
        if (nextIndex == 0) {
            return this.masterHessianProxy;
        }
        
        return this.slaveHessianProxies.get(nextIndex - 1);
    }
  }
  • 对于想增强为分布式服务调用的接口,在其上加上@Distribute注解:
  @Distribute
  public interface HelloService {
      void sayHello(String name);
  }

展望

​ 前面的简单实现Demo,虽然简单,但其实也预留了一些可以扩展的点可供后续展望:

  • DistributeStrategy可以定制,如果只针对特定路径下的服务接口做增强,可以只替换策略,不需要对原来代码做改动
  • 如果使用Distribute注解可以考虑增加一个服务组的概念,增对不同的服务组进行不同的地址转发处理
  • 可以考虑增加一个界面对于从节点进行增删该,可以增加权重的概念,根据不同的权重定制不同的路由规则

后续

​ 这只是DIY分布式改造第一篇只应用分布式改造,后续还有缓存分布式及锁的分布式改造,出于篇幅和时间限制,今天暂时写到这里。后续再做更新。

黎明前最黑暗,成功前最绝望!