返回

Spring Cloud——OpenFeign源码解析

发布时间:2023-04-10 15:07:56 165
# java# spring# java# 负载均衡# 软件

一、简介

Feign是一个http请求调用的轻量级框架,可以以Java接口注解的方式调用Http请求,而不用像Java中通过封装HTTP请求报文的方式直接调用。Feign通过处理注解,将请求模板化,当实际调用的时候,传入参数,根据参数再应用到请求上,进而转化成真正的请求,这种请求相对而言比较直观。

 

Feign被广泛应用在Spring Cloud 的解决方案中,是学习基于Spring Cloud 微服务架构不可或缺的重要组件。

二、工作原理

  • 主程序入口添加了@EnableFeignClients注解开启对FeignClient扫描加载处理。根据Feign Client的开发规范,定义接口并加@FeignClient注解。

  • 当程序启动时,会进行包扫描,扫描所有@FeignClients的注解的类,并且将这些信息注入Spring IOC容器中,当定义的的Feign接口中的方法被调用时,通过JDK动态代理方式,来生成具体的RequestTemplate。当生成代理时,Feign会为每个接口方法创建一个RequestTemplate对象,该对象封装可HTTP请求需要的全部信息,如请求参数名,请求方法等信息都是在这个过程中确定的。

  • 然后RequestTemplate生成Request,然后把Request交给Client去处理,这里指的Client可以是JDK原生的URLConnection、Apache的HttpClient、也可以是OKhttp,最后Client被封装到LoadBalanceClient类,这个类结合Ribbon负载均衡发起服务之间的调用。

三、源码分析

使用openfegin我们可以不用在yaml文件添加任何关于openfeign的配置,而只需要在一个被@Configuration注释的配置类上或者Application启动类上添加@EnableFeignClients注解。例如:

@EnableFeignClients(basePackages = {"com.yibo.order.center"})
public class OrderCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderCenterApplication.class,args);
    }
}

basePackages属性用于指定被@FeignClient注解注释的接口所在的包的包名,或者也可以直接指定clients属性,clients属性可以直接指定一个或多个被@FeignClient注释的类。

 

basePackages是一个数组,如果被@FeignClient注解注释的接口比较分散,可以指定多个包名,而不使用一个大的包名,这样可以减少包扫描耗费的时间,不拖慢应用的启动速度。

EnableFeignClients

  • 该注解引入了FeignClientsRegistrar类
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {

	//basePackages 属性的别名,允许使用更简洁的注释声明
	String[] value() default {};

	//扫描包下带注释的组件
	String[] basePackages() default {};

	//basePackages() 的类型安全的替代方法,用于指定要扫描带注释的组件的软件包,指定类别的包装将被扫描。
	Class[] basePackageClasses() default {};

	//适用于所有自定义@Configuration,可以包含组成客户端的部分的@Bean 
	Class[] defaultConfiguration() default {};

	//用@FeignClient注释的类的列表,如果不为空,则禁用类路径*扫描。
	Class[] clients() default {};

}

FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar,它是一个动态注入bean的接口,Spring Boot启动的时候,会去调用这个类中的registerBeanDefinitions来实现动态Bean的装载。它的作用类似于ImportSelector。

FeignClientsRegistrar

@EnableFeignClients注解使用@Import导入FeignClientsRegistrar类,这是一个ImportBeanDefinitionRegistrar,因此我们重点关注它的registerBeanDefinitions方法。

class FeignClientsRegistrar
		implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
		
	@Override
	public void registerBeanDefinitions(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
		//注册@EnableFeignClients中定义defaultConfiguration属性下的类,包装成FeignClientSpecification,注册到Spring容器。
  	//在@FeignClient中有一个属性:configuration,这个属性是表示各个FeignClient自定义的配置类,
		//后面也会通过调用registerClientConfiguration方法来注册成FeignClientSpecification到容器。
  	//所以,这里可以完全理解在@EnableFeignClients中配置的是做为兜底的配置,在各个@FeignClient配置的就是自定义的情况。	
		registerDefaultConfiguration(metadata, registry);
		
		
		//该方法负责读取@EnableFeignClients的属性,获取需要扫描的包名,
		//然后扫描指定的所有包名下的被@FeignClient注解注释的接口,
		//将扫描出来的接口调用registerFeignClient方法注册到spring容器。
		registerFeignClients(metadata, registry);
	}
}		

registerDefaultConfiguration

  • 解析EnableFeignClients属性,注册FeignClientSpecification类型的Bean
class FeignClientsRegistrar
		implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {

	private void registerDefaultConfiguration(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
		//解析EnableFeignClients属性
		Map defaultAttrs = metadata
				.getAnnotationAttributes(EnableFeignClients.class.getName(), true);

		if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
			String name;
			if (metadata.hasEnclosingClass()) {
				name = "default." + metadata.getEnclosingClassName();
			}
			else {
				name = "default." + metadata.getClassName();
			}
			//注册客户端配置
			registerClientConfiguration(registry, name,
					defaultAttrs.get("defaultConfiguration"));
		}
	}
	
	private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
			Object configuration) {
		//加载FeignClientSpecification bean	
		BeanDefinitionBuilder builder = BeanDefinitionBuilder
				.genericBeanDefinition(FeignClientSpecification.class);
		builder.addConstructorArgValue(name);
		builder.addConstructorArgValue(configuration);
		//注册FeignClientSpecification类型的Bean
		registry.registerBeanDefinition(
				name + "." + FeignClientSpecification.class.getSimpleName(),
				builder.getBeanDefinition());
	}
}

registerFeignClients

重点关注registerFeignClients方法,该方法负责读取@EnableFeignClients的属性,获取需要扫描的包名,然后扫描指定的所有包名下的被@FeignClient注解注释的接口,将扫描出来的接口调用registerFeignClient方法注册到spring容器。

class FeignClientsRegistrar
		implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
		
	public void registerFeignClients(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
		ClassPathScanningCandidateComponentProvider scanner = getScanner();
		scanner.setResourceLoader(this.resourceLoader);

		Set basePackages;

		Map attrs = metadata
				.getAnnotationAttributes(EnableFeignClients.class.getName());
		// 扫描带有FeignClient注解的类		
		AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(
				FeignClient.class);
		//获取@EnableFeignClients 中clients的值		
		final Class[] clients = attrs == null ? null
				: (Class[]) attrs.get("clients");
		if (clients == null || clients.length == 0) {
			//如果没有设置,那么设置类型过滤器
			scanner.addIncludeFilter(annotationTypeFilter);
			// 如果没有设置,则扫描的包路径为 @EnableFeignClients 注解所在的包
			basePackages = getBasePackages(metadata);
		}
		else {
			final Set clientClasses = new HashSet<>();
			basePackages = new HashSet<>();
			//设置了则使用注解属性来进行扫描注册
			for (Class clazz : clients) {
				basePackages.add(ClassUtils.getPackageName(clazz));
				clientClasses.add(clazz.getCanonicalName());
			}
			AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
				@Override
				protected boolean match(ClassMetadata metadata) {
					String cleaned = metadata.getClassName().replaceAll("\\$", ".");
					return clientClasses.contains(cleaned);
				}
			};
			scanner.addIncludeFilter(
					new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
		}
		//循环扫描注册
		for (String basePackage : basePackages) {
			Set candidateComponents = scanner
					.findCandidateComponents(basePackage);
			for (BeanDefinition candidateComponent : candidateComponents) {
				if (candidateComponent instanceof AnnotatedBeanDefinition) {
					// verify annotated class is an interface
					// 验证带注释的类必须是接口
					AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
					AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
					Assert.isTrue(annotationMetadata.isInterface(),
							"@FeignClient can only be specified on an interface");

					Map attributes = annotationMetadata
							.getAnnotationAttributes(
									FeignClient.class.getCanonicalName());

					String name = getClientName(attributes);
					//注册被调用客户端配置
					//注册(微服务名).FeignClientSpecification类型的bean
					//beanname: order-center.FeignClientSpecification
					registerClientConfiguration(registry, name,
							attributes.get("configuration"));
					 //注册 FeignClient
					registerFeignClient(registry, annotationMetadata, attributes);
				}
			}
		}
	}
}

registerFeignClient

  • 注册 FeignClient,组装BeanDefinition,实质是一个FeignClientFactoryBean,然后注册到Spring IOC容器。
class FeignClientsRegistrar
		implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
		
	private void registerFeignClient(BeanDefinitionRegistry registry,
			AnnotationMetadata annotationMetadata, Map attributes) {
		String className = annotationMetadata.getClassName();
		//构建FeignClientFactoryBean类型的BeanDefinitionBuilder
		BeanDefinitionBuilder definition = BeanDefinitionBuilder
				.genericBeanDefinition(FeignClientFactoryBean.class);
		validate(attributes);
		//将属性设置到 FeignClientFactoryBean 中
		definition.addPropertyValue("url", getUrl(attributes));
		definition.addPropertyValue("path", getPath(attributes));
		String name = getName(attributes);
		definition.addPropertyValue("name", name);
		String contextId = getContextId(attributes);
		definition.addPropertyValue("contextId", contextId);
		definition.addPropertyValue("type", className);
		definition.addPropertyValue("decode404", attributes.get("decode404"));
		definition.addPropertyValue("fallback", attributes.get("fallback"));
		definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
		//设置 Autowire注入的类型,按类型注入
		definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

		String alias = contextId + "FeignClient";
		AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
		beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);

		// has a default, won't be null
		boolean primary = (Boolean) attributes.get("primary");

		beanDefinition.setPrimary(primary);

		String qualifier = getQualifier(attributes);
		if (StringUtils.hasText(qualifier)) {
			alias = qualifier;
		}
		//将BeanDefinition包装成BeanDefinitionHolder,用于注册
		BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
				new String[] { alias });
		//注册 BeanDefinition		
		BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
	}
}	

至此,注册完成,注意此时仅仅只是注册到 DefaultListableBeanFactory容器的 beanDefinitionMap中,并没有实例化!

实例化

  • Spring容器启动,调用AbstractApplicationContext#refresh方法,
  • 在refresh方法内部调用finishBeanFactoryInitialization方法对单例bean进行初始化,
  • finishBeanFactoryInitialization方法调用getBean获取name对应的bean实例,如果不存在,则创建一个,即调用doGetBean方法。
  • doGetBean调用createBean方法,createBean方法调用doCreateBean方法。
  • doCreateBean()方法主要是根据 beanName、mbd、args,使用对应的策略创建 bean 实例,并返回包装类 BeanWrapper。
  • doCreateBean方法中调用populateBean对 bean 进行属性填充;其中,可能存在依赖于其他 bean 的属性,则会递归初始化依赖的 bean 实例。

getObjectForBeanInstance

在doGetBean方法中会调用getObjectForBeanInstance方法获取beanName对应的实例对象(主要用于FactoryBean的特殊处理,普通Bean会直接返回sharedInstance本身)

public abstract class AbstractBeanFactory extends FactoryBeanRegistrySupport implements ConfigurableBeanFactory {

    protected Object getObjectForBeanInstance(
            Object beanInstance, String name, String beanName, @Nullable RootBeanDefinition mbd) {

        // Don't let calling code try to dereference the factory if the bean isn't a factory.
        if (BeanFactoryUtils.isFactoryDereference(name)) {
            if (beanInstance instanceof NullBean) {
                return beanInstance;
            }
            // 1.如果name以“&”为前缀,但是beanInstance不是FactoryBean,则抛异常
            if (!(beanInstance instanceof FactoryBean)) {
                throw new BeanIsNotAFactoryException(beanName, beanInstance.getClass());
            }
            if (mbd != null) {
                mbd.isFactoryBean = true;
            }
            return beanInstance;
        }

        // Now we have the bean instance, which may be a normal bean or a FactoryBean.
        // If it's a FactoryBean, we use it to create a bean instance, unless the
        // caller actually wants a reference to the factory.
        // 2.1 如果beanInstance不是FactoryBean(也就是普通bean),则直接返回beanInstance
        if (!(beanInstance instanceof FactoryBean)) {
            return beanInstance;
        }
        //FactoryBean创建出bean实例返回
        // 3.走到这边,代表beanInstance是FactoryBean,但name不带有“&”前缀,表示想要获取的是FactoryBean创建的对象实例
        Object object = null;
        if (mbd != null) {
            mbd.isFactoryBean = true;
        }
        else {
            // 4.如果mbd为空,则尝试从factoryBeanObjectCache缓存中获取该FactoryBean创建的对象实例
            object = getCachedObjectForFactoryBean(beanName);
        }
        if (object == null) {
            // Return bean instance from factory.
            // 5.只有beanInstance是FactoryBean才能走到这边,因此直接强转
            FactoryBean factory = (FactoryBean) beanInstance;
            // Caches object obtained from FactoryBean if it is a singleton.
            if (mbd == null && containsBeanDefinition(beanName)) {
                // 6.mbd为空,但是该bean的BeanDefinition在缓存中存在,则获取该bean的MergedBeanDefinition
                mbd = getMergedLocalBeanDefinition(beanName);
            }
            // 7.mbd是否是合成的(这个字段比较复杂,mbd正常情况都不是合成的,也就是false,有兴趣的可以自己查阅资料看看)
            boolean synthetic = (mbd != null && mbd.isSynthetic());
            // 8.从FactoryBean获取对象实例
            object = getObjectFromFactoryBean(factory, beanName, !synthetic);
        }
        // 9.返回对象实例
        return object;
    }
}

getObjectFromFactoryBean

从FactoryBean获取对象实例 getObjectForBeanInstance方法中会调用getObjectFromFactoryBean从FactoryBean获取对象实例,即调用FactoryBean的getObject方法获取对象实例。

public abstract class FactoryBeanRegistrySupport extends DefaultSingletonBeanRegistry {

    protected Object getObjectFromFactoryBean(FactoryBean factory, String beanName, boolean shouldPostProcess) {
        // 1.如果是单例,并且已经存在于单例对象缓存中
        if (factory.isSingleton() && containsSingleton(beanName)) {
            //又见双重检查锁机制,尝试再从缓存中获取,防止多线程下可能有别的线程已完成该单例Bean的创建
            synchronized (getSingletonMutex()) {
                // 2.从FactoryBean创建的单例对象的缓存中获取该bean实例
                Object object = this.factoryBeanObjectCache.get(beanName);
                if (object == null) {
                    // 3.调用FactoryBean的getObject方法获取对象实例
                    object = doGetObjectFromFactoryBean(factory, beanName);
                    // Only post-process and store if not put there already during getObject() call above
                    // (e.g. because of circular reference processing triggered by custom getBean calls)
                    Object alreadyThere = this.factoryBeanObjectCache.get(beanName);
                    // 4.如果该beanName已经在缓存中存在,则将object替换成缓存中的
                    if (alreadyThere != null) {
                        object = alreadyThere;
                    }
                    else {
                        if (shouldPostProcess) {
                            if (isSingletonCurrentlyInCreation(beanName)) {
                                // Temporarily return non-post-processed object, not storing it yet..
                                return object;
                            }
                            beforeSingletonCreation(beanName);
                            try {
                                // 5.对bean实例进行后置处理,执行所有已注册的BeanPostProcessor的postProcessAfterInitialization方法
                                //触发BeanPostProcessor,第三方框架可以在此用AOP来包装Bean实例
                                object = postProcessObjectFromFactoryBean(object, beanName);
                            }
                            catch (Throwable ex) {
                                throw new BeanCreationException(beanName,
                                        "Post-processing of FactoryBean's singleton object failed", ex);
                            }
                            finally {
                                //创建完成后,从缓存锁定的名字里清除
                                afterSingletonCreation(beanName);
                            }
                        }
                        if (containsSingleton(beanName)) {
                            // 6.将beanName和object放到factoryBeanObjectCache缓存中
                            this.factoryBeanObjectCache.put(beanName, object);
                        }
                    }
                }
                // 7.返回object对象实例
                return object;
            }
        }
        else {
            // 8.调用FactoryBean的getObject方法获取对象实例
            Object object = doGetObjectFromFactoryBean(factory, beanName);
            if (shouldPostProcess) {
                try {
                    // 9.对bean实例进行后置处理,执行所有已注册的BeanPostProcessor的postProcessAfterInitialization方法
                    object = postProcessObjectFromFactoryBean(object, beanName);
                }
                catch (Throwable ex) {
                    throw new BeanCreationException(beanName, "Post-processing of FactoryBean's object failed", ex);
                }
            }
            // 10.返回object对象实例
            return object;
        }
    }
}

主要步骤:

  • 调用 FactoryBean 的 getObject 方法获取对象实例。
  • 对 bean 实例进行后续处理,执行所有已注册的 BeanPostProcessor 的 postProcessAfterInitialization 方法。

doGetObjectFromFactoryBean

用FactoryBean的getObject方法获取对象实例

public abstract class FactoryBeanRegistrySupport extends DefaultSingletonBeanRegistry {

    private Object doGetObjectFromFactoryBean(final FactoryBean factory, final String beanName)
            throws BeanCreationException {

        Object object;
        try {
            // 1.调用FactoryBean的getObject方法获取bean对象实例
            if (System.getSecurityManager() != null) {
                AccessControlContext acc = getAccessControlContext();
                try {
                    // 1.1 带有权限验证的
                    object = AccessController.doPrivileged((PrivilegedExceptionAction) factory::getObject, acc);
                }
                catch (PrivilegedActionException pae) {
                    throw pae.getException();
                }
            }
            else {
                // 1.2 不带权限
                object = factory.getObject();
            }
        }
        catch (FactoryBeanNotInitializedException ex) {
            throw new BeanCurrentlyInCreationException(beanName, ex.toString());
        }
        catch (Throwable ex) {
            throw new BeanCreationException(beanName, "FactoryBean threw exception on object creation", ex);
        }

        // Do not accept a null value for a FactoryBean that's not fully
        // initialized yet: Many FactoryBeans just return null then.
        // 2.getObject返回的是空值,并且该FactoryBean正在初始化中,则直接抛异常,
        // 不接受一个尚未完全初始化的FactoryBean的getObject返回的空值
        if (object == null) {
            if (isSingletonCurrentlyInCreation(beanName)) {
                throw new BeanCurrentlyInCreationException(
                        beanName, "FactoryBean which is currently in creation returned null from getObject");
            }
            object = new NullBean();
        }
        // 3.返回创建好的bean对象实例
        return object;
    }
}

很简单的方法,就是直接调用 FactoryBean 的 getObject 方法来获取到对象实例。

 

细心的同学可以发现,该方法是以 do 开头,以 do 开头的方法是最终进行实际操作的方法,例如本方法就是 FactoryBean 最终实际进行创建 bean 对象实例的方法。

factory.getObject()

该方法会调用到FeignClientFactoryBean的getObject方法

class FeignClientFactoryBean
		implements FactoryBean, InitializingBean, ApplicationContextAware {
		
	@Override
	public Object getObject() throws Exception {
		return getTarget();
	}
}

getObject调用的是getTarget方法,它从applicationContext取出FeignContext,FeignContext继承了NamedContextFactory,它是用来统一维护feign中各个feign客户端相互隔离的上下文。

 

FeignContext注册到容器是在FeignAutoConfiguration上完成的。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({ FeignClientProperties.class,
		FeignHttpClientProperties.class })
@Import(DefaultGzipDecoderConfiguration.class)
public class FeignAutoConfiguration {

	@Autowired(required = false)
	private List configurations = new ArrayList<>();
	
	@Bean
	public FeignContext feignContext() {
		FeignContext context = new FeignContext();
		context.setConfigurations(this.configurations);
		return context;
	}
}	

在初始化FeignContext时,会把configurations在容器中放入FeignContext中。configurations 的来源就是在前面registerFeignClients方法中将@FeignClient的配置 configuration。

FeignClientFactoryBean#getTarget

  • 构建feign.builder,在构建时会向FeignContext获取配置的Encoder,Decoder等各种信息。FeignContext在上文中已经提到会为每个Feign客户端分配了一个容器,它们的父容器就是spring容器。

  • 配置完Feign.Builder之后,再判断是否需要LoadBalance,如果需要,则通过LoadBalance的方法来设置。实际上他们最终调用的是Target.target()方法。

class FeignClientFactoryBean
		implements FactoryBean, InitializingBean, ApplicationContextAware {
		
	 T getTarget() {
		//实例化Feign上下文对象FeignContext
		FeignContext context = applicationContext.getBean(FeignContext.class);
		//生成Builder对象,用来生成Feign
		Feign.Builder builder = feign(context);
		
		//如果url为空,则走负载均衡,生成有负载均衡功能的代理类
		if (!StringUtils.hasText(url)) {
			if (!name.startsWith("http")) {
				url = "http://" + name;
			}
			else {
				url = name;
			}
			url += cleanPath();
			//@FeignClient没有配置url属性,返回有负载均衡功能的代理对象
			return (T) loadBalance(builder, context,
					new HardCodedTarget<>(type, name, url));
		}
		//如果指定了url,则生成默认的代理类
		if (StringUtils.hasText(url) && !url.startsWith("http")) {
			url = "http://" + url;
		}
		String url = this.url + cleanPath();
		Client client = getOptional(context, Client.class);
		if (client != null) {
			if (client instanceof LoadBalancerFeignClient) {
				// not load balancing because we have a url,
				// but ribbon is on the classpath, so unwrap
				client = ((LoadBalancerFeignClient) client).getDelegate();
			}
			if (client instanceof FeignBlockingLoadBalancerClient) {
				// not load balancing because we have a url,
				// but Spring Cloud LoadBalancer is on the classpath, so unwrap
				client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
			}
			builder.client(client);
		}
		Targeter targeter = get(context, Targeter.class);
		
		//生成默认代理类
		return (T) targeter.target(this, builder, context,
				new HardCodedTarget<>(type, name, url));
	}
}

feign(context)

构建feign.builder,在构建时会向FeignContext获取配置的Encoder,Decoder等各种信息。FeignContext在上文中已经提到会为每个Feign客户端分配了一个容器,它们的父容器就是spring容器。

class FeignClientFactoryBean
		implements FactoryBean, InitializingBean, ApplicationContextAware {
		
	protected Feign.Builder feign(FeignContext context) {
		FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class);
		Logger logger = loggerFactory.create(type);

		// @formatter:off
		Feign.Builder builder = get(context, Feign.Builder.class)
				// required values
				.logger(logger)	//日志
				.encoder(get(context, Encoder.class))	//编码器
				.decoder(get(context, Decoder.class))	//解码器
				.contract(get(context, Contract.class));//验证器
		// @formatter:on
		//处理了链接超时、读取超时等配置项
		configureFeign(context, builder);

		return builder;
	}
}

FeignClientFactoryBean#loadBalance

  • 生成具备负载均衡能力的feign客户端,为feign客户端构建起绑定负载均衡客户端
class FeignClientFactoryBean
		implements FactoryBean, InitializingBean, ApplicationContextAware {
		
	protected  T loadBalance(Feign.Builder builder, FeignContext context,
			HardCodedTarget target) {
		Client client = getOptional(context, Client.class);
		if (client != null) {
			builder.client(client);
			Targeter targeter = get(context, Targeter.class);
			return targeter.target(this, builder, context, target);
		}

		throw new IllegalStateException(
				"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
	}
}	

FeignClientFactoryBean#getOptional(context, Client.class)

Client client = (Client)this.getOptional(context, Client.class); 从上下文中获取一个 Client,默认是LoadBalancerFeignClient。它是在FeignRibbonClientAutoConfiguration这个自动装配类中,通过Import实现的

@Import({ HttpClientFeignLoadBalancedConfiguration.class,
		OkHttpFeignLoadBalancedConfiguration.class,
		DefaultFeignLoadBalancedConfiguration.class })
public class FeignRibbonClientAutoConfiguration {
	//......
}

DefaultFeignLoadBalancedConfiguration

这里的通过 DefaultFeignLoadBalancedConfiguration 注入客户端 Client 的实现

@Configuration(proxyBeanMethods = false)
class DefaultFeignLoadBalancedConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
			SpringClientFactory clientFactory) {
		return new LoadBalancerFeignClient(new Client.Default(null, null), cachingFactory,
				clientFactory);
	}

}

targeter.target

接下去进入targeter.target(this, builder, context, target) ,携带着构建好的这些对象去创建代理实例 ,这里有两个实现 HystrixTargeter 、DefaultTargeter 很显然,我们没有配置 Hystrix ,这里会走 DefaultTargeter

class DefaultTargeter implements Targeter {

	@Override
	public  T target(FeignClientFactoryBean factory, Feign.Builder feign,
			FeignContext context, Target.HardCodedTarget target) {
		return feign.target(target);
	}

}

feign.target

然后会来到 feign.Feign.Builder#target(feign.Target)

public abstract class Feign {

	public  T target(Target target) {
		return build().newInstance(target);
	}
	
	public Feign build() {
		Client client = Capability.enrich(this.client, capabilities);
		Retryer retryer = Capability.enrich(this.retryer, capabilities);
		List requestInterceptors = this.requestInterceptors.stream()
				.map(ri -> Capability.enrich(ri, capabilities))
				.collect(Collectors.toList());
		Logger logger = Capability.enrich(this.logger, capabilities);
		Contract contract = Capability.enrich(this.contract, capabilities);
		Options options = Capability.enrich(this.options, capabilities);
		Encoder encoder = Capability.enrich(this.encoder, capabilities);
		Decoder decoder = Capability.enrich(this.decoder, capabilities);
		InvocationHandlerFactory invocationHandlerFactory =
				Capability.enrich(this.invocationHandlerFactory, capabilities);
		QueryMapEncoder queryMapEncoder = Capability.enrich(this.queryMapEncoder, capabilities);

		SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
				new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
				logLevel, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
		ParseHandlersByName handlersByName =
				new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
				errorDecoder, synchronousMethodHandlerFactory);
		return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
	}
}

ReflectiveFeign.newInstance

最终会调用 ReflectiveFeign.newInstance

 

这个方法是用来创建一个动态代理的方法,在生成动态代理之前,会根据Contract协议(协议解析规则,解析接口类的注解信息,解析成内部的MethodHandler的处理方式。

 

从实现的代码中可以看到熟悉的Proxy.newProxyInstance方法产生代理类。而这里需要对每个定义的接口方法进行特定的处理实现,所以这里会出现一个MethodHandler的概念,就是对应方法级别的InvocationHandler。

public class ReflectiveFeign extends Feign {

	@Override
	public  T newInstance(Target target) {
		// 解析接口注解信息
		//根据接口类和Contract协议解析方式,解析接口类上的方法和注解,转换成内部的MethodHandler处理方式
		Map nameToHandler = targetToHandlersByName.apply(target);
		Map methodToHandler = new LinkedHashMap();
		List defaultMethodHandlers = new LinkedList();
		
		// 根据方法类型
		for (Method method : target.type().getMethods()) {
			if (method.getDeclaringClass() == Object.class) {
				continue;
			} else if (Util.isDefault(method)) {
				DefaultMethodHandler handler = new DefaultMethodHandler(method);
				defaultMethodHandlers.add(handler);
				methodToHandler.put(method, handler);
			} else {
				methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
			}
		}
		InvocationHandler handler = factory.create(target, methodToHandler);
		
		// 基于Proxy.newProxyInstance 为接口类创建动态实现,将所有的请求转换给InvocationHandler 处理。
		T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
				new Class[] {target.type()}, handler);

		for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
			defaultMethodHandler.bindTo(proxy);
		}
		return proxy;
	}
}

targetToHandlersByName.apply

targetToHandlersByName.apply(target) :根据Contract协议规则,解析接口类的注解信息,解析成内部表现:targetToHandlersByName.apply(target);会解析接口方法上的注解,从而解析出方法粒度的特定的配置信息,然后生产一个SynchronousMethodHandler 然后需要维护一个的map,放入InvocationHandler的实现FeignInvocationHandler中。

public class ReflectiveFeign extends Feign {

	static final class ParseHandlersByName {
		public Map apply(Target target) {
			List metadata = contract.parseAndValidateMetadata(target.type());
			Map result = new LinkedHashMap();
			for (MethodMetadata md : metadata) {
				BuildTemplateByResolvingArgs buildTemplate;
				if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
				buildTemplate =
						new BuildFormEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
				} else if (md.bodyIndex() != null) {
					buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
				} else {
					buildTemplate = new BuildTemplateByResolvingArgs(md, queryMapEncoder, target);
				}
				if (md.isIgnored()) {
					result.put(md.configKey(), args -> {
						throw new IllegalStateException(md.configKey() + " is not a method handled by feign");
					});
				} else {
					result.put(md.configKey(),
						factory.create(target, md, buildTemplate, options, decoder, errorDecoder));
				}
			}
			return result;
		}
	}
}  

SpringMvcContract:当前Spring Cloud 微服务解决方案中,为了降低学习成本,采用了Spring MVC的部分注解来完成 请求协议解析,也就是说 ,写客户端请求接口和像写服务端代码一样:客户端和服务端可以通过SDK的方式进行约定,客户端只需要引入服务端发布的SDK API,就可以使用面向接口的编码方式对接服务。

OpenFeign调用过程 :

在前面的分析中,我们知道OpenFeign最终返回的是一个 ReflectiveFeign.FeignInvocationHandler 的对象。那么当客户端发起请求时,会进入到 FeignInvocationHandler.invoke 方法中,这个大家都知道,它是一个动态代理的实现。

public class ReflectiveFeign extends Feign {

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		if ("equals".equals(method.getName())) {
			try {
				Object otherHandler =
						args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
				return equals(otherHandler);
			} catch (IllegalArgumentException e) {
				return false;
			}
		} else if ("hashCode".equals(method.getName())) {
			return hashCode();
		} else if ("toString".equals(method.getName())) {
			return toString();
		}
		// 利用分发器筛选方法,找到对应的handler 进行处理
		return dispatch.get(method).invoke(args);
	}
}  

SynchronousMethodHandler#invoke

而接着,在invoke方法中,会调用 this.dispatch.get(method)).invoke(args) 。this.dispatch.get(method) 会返回一个SynchronousMethodHandler,进行拦截处理。这个方法会根据参数生成完成的RequestTemplate对象,这个对象是Http请求的模版,代码如下。

final class SynchronousMethodHandler implements MethodHandler {

	@Override
	public Object invoke(Object[] argv) throws Throwable {
		RequestTemplate template = buildTemplateFromArgs.create(argv);
		Options options = findOptions(argv);
		Retryer retryer = this.retryer.clone();
		while (true) {
			try {
				return executeAndDecode(template, options);
			} catch (RetryableException e) {
				try {
					retryer.continueOrPropagate(e);
				} catch (RetryableException th) {
					Throwable cause = th.getCause();
					if (propagationPolicy == UNWRAP && cause != null) {
						throw cause;
					} else {
						throw th;
					}
				}
				if (logLevel != Logger.Level.NONE) {
					logger.logRetry(metadata.configKey(), logLevel);
				}
				continue;
			}
		}
	}
}

SynchronousMethodHandler#executeAndDecode

经过上述的代码,我们已经将restTemplate拼装完成,上面的代码中有一个 executeAndDecode() 方法,该方法通过RequestTemplate生成Request请求对象,然后利用Http Client获取response,来获取响应信息。

final class SynchronousMethodHandler implements MethodHandler {

	Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
		//转化为Http请求报文
		Request request = targetRequest(template);

		if (logLevel != Logger.Level.NONE) {
			logger.logRequest(metadata.configKey(), logLevel, request);
		}

		Response response;
		long start = System.nanoTime();
		try {
			//发起远程通信
			response = client.execute(request, options);
			
			//获取返回结果
			response = response.toBuilder()
					.request(request)
					.requestTemplate(template)
					.build();
		} catch (IOException e) {
			if (logLevel != Logger.Level.NONE) {
				logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
			}
			throw errorExecuting(request, e);
		}
		long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);


		if (decoder != null)
			return decoder.decode(response, metadata.returnType());

		CompletableFuture resultFuture = new CompletableFuture<>();
		asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
					metadata.returnType(),elapsedTime);

		try {
			if (!resultFuture.isDone())
				throw new IllegalStateException("Response handling not done");

			return resultFuture.join();
		} catch (CompletionException e) {
			Throwable cause = e.getCause();
			if (cause != null)
				throw cause;
			throw e;
		}
	}
}

经过上面的分析,这里的 client.execute 的 client 的类型是LoadBalancerFeignClient

LoadBalancerFeignClient#execute

这里就很自然的进入 LoadBalancerFeignClient#execute

public class LoadBalancerFeignClient implements Client {

	@Override
	public Response execute(Request request, Request.Options options) throws IOException {
		try {
			URI asUri = URI.create(request.url());
			String clientName = asUri.getHost();
			URI uriWithoutHost = cleanUrl(request.url(), clientName);
			FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
					this.delegate, request, uriWithoutHost);

			IClientConfig requestConfig = getClientConfig(options, clientName);
			return lbClient(clientName)
					.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
		}
		catch (ClientException e) {
			IOException io = findIOException(e);
			if (io != null) {
				throw io;
			}
			throw new RuntimeException(e);
		}
	}
}

其实这个execute里面得流程就是 Ribbon 的那一套。我们可以简单的看一下。首先是构造URI,构造RibbonRequest,选择 LoadBalance,发起调用。

 

来看一下lbClient 选择负载均衡器的时候做了什么

public class LoadBalancerFeignClient implements Client {

	private FeignLoadBalancer lbClient(String clientName) {
		return this.lbClientFactory.create(clientName);
	}
	
	public FeignLoadBalancer create(String clientName) {
		FeignLoadBalancer client = this.cache.get(clientName);
		if (client != null) {
			return client;
		}
		IClientConfig config = this.factory.getClientConfig(clientName);
		ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
		ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
				ServerIntrospector.class);
		client = this.loadBalancedRetryFactory != null
				? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
						this.loadBalancedRetryFactory)
				: new FeignLoadBalancer(lb, config, serverIntrospector);
		this.cache.put(clientName, client);
		return client;
	}	
}

可以得出的结论就是 this.factory.getLoadBalancer(clientName) 跟Ribbon 源码里的获取方式一样,无疑这里获取的就是默认的 ZoneAwareLoadBalancer。然后包装成一个 FeignLoadBalancer 进行返回。

 

既然负载均衡器选择完了,那么一定还有个地方通过该负载去选择一个服务,接着往下看:

public abstract class AbstractLoadBalancerAwareClient extends LoadBalancerContext implements IClient, IClientConfigAware {

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation() {
                    @Override
                    public Observable call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
    }
}

上面这段代码就是通过获取到的负载进行执行请求,但是这个时候 服务还没有选择,我们跟进去 submit 请求看一看究竟:

public class LoadBalancerCommand {

    public Observable submit(final ServerOperation operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        Observable o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1>() {
					//省略......
                });
            
        //省略......
    }
}

可以看到这里有个 selectServer的方法 ,跟进去:

public class LoadBalancerCommand {

    private final LoadBalancerContext loadBalancerContext;

    private Observable selectServer() {
        return Observable.create(new OnSubscribe() {
            @Override
            public void call(Subscriber next) {
                try {
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
}


public class LoadBalancerContext implements IClientConfigAware {

    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
            } else {
                // No Full URL - and we dont have a LoadBalancer registered to
                // obtain a server
                // if we have a vipAddress that came with the registration, we
                // can use that else we
                // bail out
                if (vipAddresses != null && vipAddresses.contains(",")) {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            "Method is invoked for client " + clientName + " with partial URI of ("
                            + original
                            + ") with no load balancer configured."
                            + " Also, there are multiple vipAddresses and hence no vip address can be chosen"
                            + " to complete this partial uri");
                } else if (vipAddresses != null) {
                    try {
                        Pair hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
                        host = hostAndPort.first();
                        port = hostAndPort.second();
                    } catch (URISyntaxException e) {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                "Method is invoked for client " + clientName + " with partial URI of ("
                                + original
                                + ") with no load balancer configured. "
                                + " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
                    }
                } else {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            this.clientName
                            + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
                            + " Also has no vipAddress registered");
                }
            }
        } else {
            // Full URL Case
            // This could either be a vipAddress or a hostAndPort or a real DNS
            // if vipAddress or hostAndPort, we just have to consult the loadbalancer
            // but if it does not return a server, we should just proceed anyways
            // and assume its a DNS
            // For restClients registered using a vipAddress AND executing a request
            // by passing in the full URL (including host and port), we should only
            // consult lb IFF the URL passed is registered as vipAddress in Discovery
            boolean shouldInterpretAsVip = false;

            if (lb != null) {
                shouldInterpretAsVip = isVipRecognized(original.getAuthority());
            }
            if (shouldInterpretAsVip) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc != null){
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("using LB returned Server: {} for request: {}", svc, original);
                    return svc;
                } else {
                    // just fall back as real DNS
                    logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
                }
            } else {
                // consult LB to obtain vipAddress backed instance given full URL
                //Full URL execute request - where url!=vipAddress
                logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
            }
        }
        // end of creating final URL
        if (host == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
        }
        // just verify that at this point we have a full URL

        return new Server(host, port);
    }
}

可以看到的是这里获取到了之前构造好的 ZoneAwareLoadBalancer 然后调用 chooseServer 方法获取server ,这个是跟Ribbon 中是一样的流程。

 

获取到了server 后,会回调先前 executeWithLoadBalancer 方法里构造的 ServerOperation 的 call 方法:

return command.submit(
	new ServerOperation() {
		@Override
		public Observable call(Server server) {
			URI finalUri = reconstructURIWithServer(server, request.getUri());
			S requestForServer = (S) request.replaceUri(finalUri);
			try {
				return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
			} 
			catch (Exception e) {
				return Observable.error(e);
			}
		}
	})
	.toBlocking()
	.single();

然后会执行 AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig) 进行最后的调用,实际上这里走的是 FeignLoadBalancer#execute

public class FeignLoadBalancer extends
		AbstractLoadBalancerAwareClient {

	@Override
	public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
			throws IOException {
		Request.Options options;
		if (configOverride != null) {
			RibbonProperties override = RibbonProperties.from(configOverride);
			options = new Request.Options(override.connectTimeout(this.connectTimeout),
					override.readTimeout(this.readTimeout));
		}
		else {
			options = new Request.Options(this.connectTimeout, this.readTimeout);
		}
		Response response = request.client().execute(request.toRequest(), options);
		return new RibbonResponse(request.getUri(), response);
	}
}

而这里调用的request.client().execute(request.toRequest(), options) 则是 DefaultFeignLoadBalancedConfiguration 注入的 LoadBalancerFeignClient ,在构造 LoadBalancerFeignClient 的时候 ,传递了个 feign.Client.Default ,然后利用 feign.Client.Default 构造了一个 RibbonRequest。

 

所以这里走 feign.Client.Default#execute :

public interface Client {

	class Default implements Client {
		@Override
		public Response execute(Request request, Options options) throws IOException {
		  HttpURLConnection connection = convertAndSend(request, options);
		  return convertResponse(connection, request);
		}
	}
}

利用 JDK 提供的 HttpURLConnection 发起远程的 HTTP通讯。至此发起请求的流程就完成了。下面附上一张这个过程的流程图:

对于Ribbon的调用过程请参考:https://www.jianshu.com/p/f3db11f045cc

OpenFeign Configuration :

针对 feign 的 Configuration,官方给我们提供了很多的个性化配置,具体可以参考 org.springframework.cloud.openfeign.FeignClientProperties.FeignClientConfiguration

public static class FeignClientConfiguration {

	// 日志
	private Logger.Level loggerLevel;
	// 连接超时
	private Integer connectTimeout;

	private Integer readTimeout;
	//重试
	private Class retryer;
	//解码
	private Class errorDecoder;

	private List> requestInterceptors;
	// 编码
	private Boolean decode404;

	private Class decoder;

	private Class encoder;
	// 解析
	private Class contract;

	private ExceptionPropagationPolicy exceptionPropagationPolicy;
}

这里举个简单的例子,以Logger 为例。我们想为每个不同的 FeignClient 设置日志级别。

1、添加配置类:

@Configuration
public class FooConfiguration {
	@Bean
	Logger.Level feignLoggerLevel() {
		return Logger.Level.FULL;
	}
}

2、配置日志级别 ,logging.level + FeignClient 包的全路径。

logging.level.com.wuzz.FeignClientService: DEBUG

就这样就配置完成了。重启服务就可以看到效果。

 

参考: https://www.cnblogs.com/lucky-yqy/p/14589434.html

https://www.cnblogs.com/wuzhenzhao/p/13680807.html

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
精选文章
thumb 中国研究员首次曝光美国国安局顶级后门—“方程式组织”
thumb 俄乌线上战争,网络攻击弥漫着数字硝烟
thumb 从网络安全角度了解俄罗斯入侵乌克兰的相关事件时间线
下一篇
Spring Cloud——Feign设计原理 2023-04-10 12:12:02