跳到主要内容

基于 AbstractRoutingDataSource + TTL 的动态多数据源架构演进设计

·11160 字·23 分钟

基于 AbstractRoutingDataSource + TTL 的动态多数据源架构演进设计 #

前言 #

去年接手一个多租户业务改造项目,客户要求实现"数据主权"——不同盟市的数据必须物理隔离存储。听起来很简单,不就是根据请求动态切换数据源嘛?结果真正落地时才发现这个"简单"的需求,踩了无数的坑。

从最初版本的内存泄漏,到异步场景下上下文丢失,再到跨服务调用链路断裂,最后还要面对性能瓶颈。每个问题都逼着我深入到 Java 并发、Spring 框架、微服务架构的底层原理。这篇文章记录这段踩坑和演进的历程,希望能给遇到类似问题的同学一些参考。

技术栈背景:

  • Spring Boot 2.1.6
  • Spring Cloud OpenFeign Greenwich.SR2
  • Druid 连接池
  • H2 内存数据库(演示用,生产环境用的 MySQL)

业务场景:

  • 内蒙古自治区有 12 个盟市,每个盟市的数据需要独立存储
  • 服务收到请求后,需要根据盟市编码(cityCode)路由到对应的数据库
  • 涉及同步调用、异步处理、微服务间调用等复杂场景

第一章:原始版本 - 初试 AbstractRoutingDataSource #

1.1 核心思路 #

最开始调研 Spring 官方文档,发现有个 AbstractRoutingDataSource 专门用来做动态路由。思路很清晰:

  1. 定义多个物理数据源 - 为每个盟市配置独立的数据库连接
  2. 基于 ThreadLocal 保存上下文 - 当前线程要访问哪个数据源
  3. AOP 拦截 Service 层 - 在方法执行前从请求中解析出 cityCode 并设置到 ThreadLocal
  4. AbstractRoutingDataSource 路由 - 根据 ThreadLocal 的值返回对应数据源

1.2 AbstractRoutingDataSource 深度剖析 #

先看 Spring 这个类的核心源码:

public abstract class AbstractRoutingDataSource extends AbstractDataSource 
        implements InitializingingBean {
  
    // 保存所有目标数据源的 Map
    private Map<Object, Object> targetDataSources;
  
    // 默认数据源(找不到时的兜底)
    private Object defaultTargetDataSource;
  
    // 解析后的真实 DataSource 对象
    private Map<Object, DataSource> resolvedDataSources;
  
    @Override
    public Connection getConnection() throws SQLException {
        // 关键!每次获取连接时都会调用这个方法
        return determineTargetDataSource().getConnection();
    }
  
    protected DataSource determineTargetDataSource() {
        // 1. 调用子类实现的钩子方法,获取数据源标识
        Object lookupKey = determineCurrentLookupKey();
  
        // 2. 从解析好的 Map 中取出真实 DataSource
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
  
        if (dataSource == null) {
            // 找不到就用默认的
            dataSource = this.resolvedDefaultDataSource;
        }
  
        return dataSource;
    }
  
    // 子类必须实现这个方法,返回当前应该使用哪个数据源
    protected abstract Object determineCurrentLookupKey();
}

工作原理图:

核心要点:

  1. AbstractRoutingDataSource 本身不存储连接,只是个"路由器"
  2. 每次 getConnection() 都会调用 determineCurrentLookupKey()
  3. 这就是为什么我们可以基于 ThreadLocal 动态切换 - 每次取连接都重新判断

1.3 V0 代码实现 #

Step 1: 自定义动态数据源

public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
  
    @Override
    protected Object determineCurrentLookupKey() {
        // 从 ThreadLocal 中获取当前线程的租户标识
        return TenantContextHolder.getCityCode();
    }
}

就这么简单!重写一个方法,把数据源路由的决策权交给我们的上下文管理器。

Step 2: ThreadLocal 上下文管理

public class TenantContextHolder {
  
    // 使用 ThreadLocal 保存当前线程的租户标识
    private static final ThreadLocal<String> CITY_CONTEXT = new ThreadLocal<>();
  
    public static void setCityCode(String cityCode) {
        CITY_CONTEXT.set(cityCode);
    }
  
    public static String getCityCode() {
        String cityCode = CITY_CONTEXT.get();
        return cityCode != null ? cityCode : "150100"; // 默认呼和浩特
    }
  
    public static void clear() {
        CITY_CONTEXT.remove();
    }
}

Step 3: 配置多数据源

@Configuration
public class DataSourceConfig {
  
    @Bean(name = "city150100DataSource")
    public DataSource city150100DataSource() {
        DruidDataSource ds = new DruidDataSource();
        ds.setUrl("jdbc:mysql://10.1.1.10:3306/hohhot_db");
        ds.setUsername("root");
        ds.setPassword("xxx");
        ds.setInitialSize(5);
        ds.setMinIdle(5);
        ds.setMaxActive(20);
        // ... 其他 Druid 配置
        return ds;
    }
  
    @Bean(name = "city150200DataSource")
    public DataSource city150200DataSource() {
        DruidDataSource ds = new DruidDataSource();
        ds.setUrl("jdbc:mysql://10.1.2.10:3306/baotou_db");
        ds.setUsername("root");
        ds.setPassword("xxx");
        ds.setInitialSize(5);
        ds.setMinIdle(5);
        ds.setMaxActive(20);
        return ds;
    }
  
    @Bean
    @Primary
    public DataSource routingDataSource(
            @Qualifier("city150100DataSource") DataSource ds1,
            @Qualifier("city150200DataSource") DataSource ds2) {
  
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("150100", ds1); // key 和 determineCurrentLookupKey() 返回值对应
        targetDataSources.put("150200", ds2);
  
        DynamicRoutingDataSource routingDs = new DynamicRoutingDataSource();
        routingDs.setTargetDataSources(targetDataSources);
        routingDs.setDefaultTargetDataSource(ds1); // 默认数据源
        routingDs.afterPropertiesSet(); // 必须调用!解析 targetDataSources
  
        return routingDs;
    }
}

Druid 连接池关键配置说明:

参数作用多租户场景建议
initialSize初始化连接数设置较小值(3-5),避免启动时连接爆炸
minIdle最小空闲连接同上,按实际负载调整
maxActive最大活跃连接单租户 20-30,需考虑数据库最大连接数
maxWait获取连接最大等待时间60000ms(1分钟),避免雪崩
testWhileIdle空闲时检测连接有效性true,防止连接失效
timeBetweenEvictionRunsMillis检测间隔30000ms,定期清理死连接

多租户场景下,假设有 12 个盟市,每个连接池 maxActive=20,理论上最大就是 240 个连接。MySQL 默认 max_connections=151,很容易打满!所以需要:

  1. 降低单个连接池的最大连接数
  2. 调大数据库的 max_connections
  3. 考虑连接复用和池化策略

Step 4: AOP 拦截 Service 设置上下文

@Aspect
@Component
public class DataSourceAspect {
  
    @Around("execution(* com.example.service..*.*(..))")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        // 从请求中解析 cityCode(这里简化处理,实际从 Header 或参数获取)
        String cityCode = getCurrentCityCode();
  
        // 设置到 ThreadLocal
        TenantContextHolder.setCityCode(cityCode);
  
        try {
            // 执行目标方法(这时 getConnection 会读取 ThreadLocal)
            return pjp.proceed();
        } finally {
            // 清理上下文
            TenantContextHolder.clear();
        }
    }
  
    private String getCurrentCityCode() {
        // 实际项目中从 Request Header 或 JWT Token 解析
        ServletRequestAttributes attrs = 
            (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = attrs.getRequest();
        return request.getHeader("x-city-code");
    }
}

1.4 流程图 #

完整的请求处理流程:

1.5 V0 版本的优缺点 #

优点:

  1. ✅ 实现简单,代码量少
  2. ✅ 利用 Spring 官方机制,稳定可靠
  3. ✅ Druid 连接池成熟,监控完善

缺点(后面会解决):

  1. ❌ 跨线程场景(异步、线程池)上下文会丢失
  2. ❌ 跨服务调用无法传递租户信息
  3. ❌ AOP 解析 cityCode 存在性能开销
  4. ❌ ThreadLocal 不清理可能导致上下文串数据

第二章:迭代 1 - 异步场景的上下文传递难题 #

2.1 新的挑战 #

上线没几天,业务方提了个需求:职位查询接口需要同时调用多个下游服务聚合数据,串行太慢了,要用异步并行。

开发小李兴冲冲写了这段代码:

@Service
public class JobService {
  
    @Autowired
    private JobRepository jobRepository;
  
    @Async
    public CompletableFuture<List<JobRecord>> queryJobsAsync() {
        // 这里想查询当前租户的数据
        String cityCode = TenantContextHolder.getCityCode();
        System.out.println("异步线程中的 cityCode: " + cityCode);
  
        return CompletableFuture.completedFuture(
            jobRepository.findAll()
        );
    }
}

主线程调用:

@RestController
public class JobController {
  
    @Autowired
    private JobService jobService;
  
    @GetMapping("/jobs")
    public List<JobRecord> getJobs() {
        // 主线程设置了 cityCode = 150200
        TenantContextHolder.setCityCode("150200");
  
        CompletableFuture<List<JobRecord>> future = jobService.queryJobsAsync();
        return future.join(); // 等待异步结果
    }
}

结果:

主线程中的 cityCode: 150200
异步线程中的 cityCode: 150100  // 变成默认值了!

数据查错库了!

2.2 ThreadLocal 的本质局限 #

为什么异步线程拿不到值?

因为 ThreadLocal 的设计初衷就是线程隔离,不同线程之间本来就不应该共享数据。

两个线程各自有自己的 ThreadLocalMap,互不相通。

ThreadLocal 原理速览:

ThreadLocal 不是用来解决多线程资源共享问题的,恰恰相反,它是用来隔离线程间数据的。

每个线程内部都有一个 ThreadLocalMap,key 是 ThreadLocal 对象本身,value 是我们存储的值。

// Thread 类的源码片段
public class Thread {
    // 每个线程实例都有这个变量
    ThreadLocal.ThreadLocalMap threadLocals = null;
}

// ThreadLocal 的 get 方法
public T get() {
    Thread t = Thread.currentThread(); // 获取当前线程
    ThreadLocalMap map = t.threadLocals; // 拿到当前线程的 Map
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this); // this 是 ThreadLocal 对象本身
        if (e != null) {
            return (T)e.value;
        }
    }
    return setInitialValue();
}

常见的异步场景都会遇到这个问题:

  1. @Async 注解的异步方法
  2. CompletableFuture.supplyAsync()
  3. 手动创建的 ExecutorService
  4. Spring 的 @Scheduled 定时任务
  5. 消息队列的消费者线程

2.3 TransmittableThreadLocal 救场 #

阿里开源的 TTL(TransmittableThreadLocal) 专门解决父子线程、线程池场景的上下文传递问题。

引入依赖:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.12.5</version>
</dependency>

核心改造:

public class TenantContextHolder {
  
    // 把 ThreadLocal 替换成 TransmittableThreadLocal
    private static final TransmittableThreadLocal<String> CITY_TTL = 
        new TransmittableThreadLocal<>();
  
    public static void setCityCode(String cityCode) {
        CITY_TTL.set(cityCode);
    }
  
    public static String getCityCode() {
        String code = CITY_TTL.get();
        return code != null ? code : "150100";
    }
  
    public static void clear() {
        CITY_TTL.remove();
    }
}

配置线程池(关键!):

TTL 不会自动生效,必须用 TtlExecutors 包装线程池:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
  
    @Override
    @Bean(name = "ttlExecutor")
    public Executor getAsyncExecutor() {
        // 创建普通线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("ttl-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
  
        // 核心!用 TTL 包装器包装
        return TtlExecutors.getTtlExecutor(executor);
    }
}

业务代码指定线程池:

@Service
public class JobService {
  
    @Autowired
    private JobRepository jobRepository;
  
    // 指定使用 TTL 包装的线程池
    @Async("ttlExecutor")
    public CompletableFuture<List<JobRecord>> queryJobsAsync() {
        String cityCode = TenantContextHolder.getCityCode();
        System.out.println("异步线程中的 cityCode: " + cityCode); // 现在能拿到了!
  
        return CompletableFuture.completedFuture(
            jobRepository.findAll()
        );
    }
}

2.4 TTL 的工作原理深度剖析 #

TTL 采用 CRR 模式:

  • Capture(捕获): 任务提交时,捕获主线程的上下文
  • Replay(重放): 任务执行前,将上下文重放到子线程
  • Restore(恢复): 任务执行完,恢复子线程原本的上下文

源码级分析:

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {
  
    // 全局注册表,记录所有 TTL 实例
    private static final InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> 
        holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
        @Override
        protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
            return new HashMap<>();
        }
  
        @Override
        protected Map<TransmittableThreadLocal<?>, ?> childValue(
                Map<TransmittableThreadLocal<?>, ?> parentValue) {
            // 父线程的 Map 拷贝给子线程
            return new HashMap<>(parentValue);
        }
    };
  
    @Override
    public void set(T value) {
        super.set(value);
        // 同时注册到全局 holder
        holder.get().put(this, value);
    }
}

TtlExecutor 的包装逻辑:

public class TtlExecutors {
  
    public static Executor getTtlExecutor(Executor executor) {
        return new ExecutorTtlWrapper(executor);
    }
  
    static class ExecutorTtlWrapper implements Executor {
        private final Executor executor;
  
        @Override
        public void execute(Runnable command) {
            // 1. Capture: 捕获当前线程(提交任务的线程)的所有 TTL 值
            Object captured = TransmittableThreadLocal.Transmitter.capture();
  
            // 2. 包装原始 Runnable
            Runnable ttlRunnable = () -> {
                // 3. Replay: 在执行线程中恢复捕获的上下文
                Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
                try {
                    // 4. 执行真正的任务
                    command.run();
                } finally {
                    // 5. Restore: 恢复执行线程原本的上下文
                    TransmittableThreadLocal.Transmitter.restore(backup);
                }
            };
  
            // 提交到真正的线程池
            executor.execute(ttlRunnable);
        }
    }
}

图解 CRR 流程:

关键点理解:

  1. Capture 时机: 在主线程调用 executor.execute() 时,还没进入线程池
  2. Replay 时机: 在工作线程真正执行 task.run() 之前
  3. Restore 时机: 任务执行完毕,保证线程池线程的复用不会被"污染"

2.5 实战案例:CompletableFuture 并行查询 #

改造后的完整代码:

@Service
public class JobAggregationService {
  
    @Autowired
    private JobRepository jobRepository;
  
    @Autowired
    private CompanyClient companyClient; // Feign 客户端
  
    @Autowired
    @Qualifier("ttlExecutor")
    private Executor ttlExecutor;
  
    public JobAggregationResult aggregateJobs() {
        String cityCode = TenantContextHolder.getCityCode();
        log.info("主线程 cityCode: {}", cityCode);
  
        // 并行执行三个任务
        CompletableFuture<List<JobRecord>> jobsFuture = 
            CompletableFuture.supplyAsync(() -> {
                log.info("查询职位,cityCode: {}", TenantContextHolder.getCityCode());
                return jobRepository.findAll();
            }, ttlExecutor);
  
        CompletableFuture<List<CompanyInfo>> companyFuture = 
            CompletableFuture.supplyAsync(() -> {
                log.info("查询公司,cityCode: {}", TenantContextHolder.getCityCode());
                return companyClient.listCompanies();
            }, ttlExecutor);
  
        CompletableFuture<Statistics> statsFuture = 
            CompletableFuture.supplyAsync(() -> {
                log.info("统计数据,cityCode: {}", TenantContextHolder.getCityCode());
                return calculateStats();
            }, ttlExecutor);
  
        // 等待所有任务完成
        CompletableFuture.allOf(jobsFuture, companyFuture, statsFuture).join();
  
        // 聚合结果
        return new JobAggregationResult(
            jobsFuture.join(),
            companyFuture.join(),
            statsFuture.join()
        );
    }
}

日志输出:

主线程 cityCode: 150200
查询职位,cityCode: 150200  ✅
查询公司,cityCode: 150200  ✅
统计数据,cityCode: 150200  ✅

三个异步任务都正确拿到了租户标识!

2.6 性能对比测试 #

测试场景:单个请求内启动 10 个异步任务查询数据库

方案平均耗时CPU 使用率上下文丢失率
普通 ThreadLocal1200ms45%100%(全丢失)
TTL(未包装线程池)1200ms45%100%(仍丢失)
TTL(正确包装)320ms65%0%

分析:

  • TTL 本身没有明显性能损耗,主要开销在 Capture/Replay
  • 并行查询带来的性能提升远大于 TTL 的损耗
  • CPU 使用率上升是因为并发度提高了(这是好事)

2.7 迭代 1 小结 #

解决了什么:

  • ✅ 异步场景下 cityCode 能正确传递
  • ✅ 支持 @AsyncCompletableFuture、线程池等各种异步方式
  • ✅ CRR 模式自动清理子线程上下文,不会造成污染

关键要点:

  • ⚠️ 必须用 TtlExecutors.getTtlExecutor() 包装线程池
  • ⚠️ 主线程仍然需要在 Filter 中手动 clear()(后续会讲)
  • ⚠️ 业务代码使用 @Async 时要指定 TTL 包装的线程池

遗留问题:

  • ❌ 跨服务(Feign)调用时 cityCode 无法传递
  • ❌ AOP 解析 Token 性能开销
  • ❌ 主线程的上下文清理还需要规范

第三章:迭代 2 - 跨服务调用的上下文断层 #

3.1 微服务场景的新挑战 #

系统拆分成微服务后,出现了新问题:

网关 -> 服务A(上游) -> 服务B(下游)
         ↑                ↑
    cityCode=150200   cityCode=???

现象:

  • 网关解析 JWT,把 cityCode=150200 写入 Header x-city-code
  • 服务A 通过 Filter 读取 Header,设置到 TTL,查询数据库 ✅
  • 服务A 调用服务B(通过 Feign),服务B 的 cityCode 丢失 ❌

问题根源:

TTL 只能在同一个 JVM 进程内传递,跨进程调用本质上是 HTTP 请求,需要通过 Header 携带。

但我们的 Feign 客户端代码:

@FeignClient(name = "downstream-service")
public interface DownstreamClient {
  
    @GetMapping("/internal/jobs")
    List<JobRecord> queryJobs();
}

发起请求时根本没有带上 x-city-code Header!

3.2 解决方案:Feign 拦截器自动注入 Header #

核心思路:

  1. 出站(服务A -> 服务B): Feign RequestInterceptor 从 TTL 读取 cityCode,写入 HTTP Header
  2. 入站(服务B 收到请求): Servlet Filter 读取 Header,写入本地 TTL

代码实现:

Step 1: Feign 拦截器(出站)

@Configuration
public class FeignContextInterceptor implements RequestInterceptor {
  
    @Override
    public void apply(RequestTemplate template) {
        // 从当前线程的 TTL 中取出租户标识
        String cityCode = TenantContextHolder.getCityCode();
  
        if (StringUtils.hasText(cityCode)) {
            // 写入 Feign 请求的 Header
            template.header(TenantContextHolder.HEADER_CITY_CODE, cityCode);
            log.debug("Feign 出站请求注入 Header: x-city-code={}", cityCode);
        }
    }
}

Step 2: Servlet Filter(入站,复用之前的)

@Component
public class TenantContextFilter implements Filter {
  
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
  
        HttpServletRequest httpRequest = (HttpServletRequest) request;
  
        // 从 Header 读取租户标识(可能是网关传来的,也可能是上游服务传来的)
        String cityCode = httpRequest.getHeader(TenantContextHolder.HEADER_CITY_CODE);
  
        if (StringUtils.hasText(cityCode)) {
            TenantContextHolder.setCityCode(cityCode);
            log.debug("Filter 入站请求解析 Header: x-city-code={}", cityCode);
        }
  
        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
}

3.3 完整调用链路图 #

3.4 Filter vs Interceptor 的深入对比 #

很多同学可能疑惑:为什么出站用 Interceptor,入站用 Filter?能不能都用 Filter 或都用 Interceptor?

执行时机对比:

关键区别:

维度FilterInterceptor(HandlerInterceptor)Interceptor(FeignInterceptor)
所属层次Servlet 规范Spring MVCSpring Cloud Feign
执行时机最早(所有请求)Controller 前后Feign 调用前
访问范围ServletRequestHttpServletRequest + ModelAndViewFeign RequestTemplate
适用场景认证、日志、编码权限、参数校验下游调用透传

为什么入站用 Filter?

  1. 执行时机最早: 在 DispatcherServlet 之前,能覆盖所有请求(包括静态资源、异常处理)
  2. 生命周期完整: doFilter()finally 块能保证清理逻辑一定执行
  3. 不依赖 Spring MVC: 即使是 WebFlux 或其他框架,Filter 仍然有效

为什么出站用 Feign RequestInterceptor?

因为 Feign 调用不走 Servlet Filter 链!

Feign 本质上是 HTTP 客户端,发起的是出站请求,不经过 Servlet 容器:

能不能都用 Filter?

不行!Filter 只能拦截入站请求,拦截不了 Feign 的出站调用

能不能都用 Interceptor?

不推荐!虽然可以用 HandlerInterceptor 拦截入站,但:

  1. 执行时机晚于 Filter,某些场景可能来不及(比如 Spring Security 在 Filter 层)
  2. 静态资源请求不会经过 Interceptor
  3. 异常处理可能绕过 Interceptor 的 afterCompletion

能不能先 Filter 再 Interceptor(入站)?

可以,但没必要。Filter 已经足够早,重复拦截浪费性能。

标准组合:

  • 入站: Servlet Filter(设置上下文 + 清理)
  • 出站: Feign RequestInterceptor(注入 Header)

3.5 边界 Case 处理 #

Case 1: 下游服务自调用

服务B 内部可能也有 Feign 调用自己:

@FeignClient(name = "downstream-service", url = "http://localhost:8081")
public interface SelfClient {
    @GetMapping("/api/xxx")
    String callSelf();
}

这种情况下,Header 仍然会被正确传递,因为 FeignContextInterceptor 是全局生效的。

Case 2: 异步 + 跨服务组合

@Async("ttlExecutor")
public CompletableFuture<List<JobRecord>> asyncCallDownstream() {
    // 1. TTL 保证异步线程能拿到 cityCode
    String cityCode = TenantContextHolder.getCityCode();
  
    // 2. Feign 拦截器把 cityCode 注入 Header
    return CompletableFuture.completedFuture(
        downstreamClient.queryJobs()
    );
}

两个机制协同工作:

  • TTL 负责进程内(主线程 -> 异步线程)
  • Feign Interceptor 负责进程间(服务A -> 服务B)

Case 3: 网关没有传 Header 怎么办?

@Component
public class TenantContextFilter implements Filter {
  
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
  
        String cityCode = extractCityCode(request);
  
        if (!StringUtils.hasText(cityCode)) {
            // 兜底:尝试从 JWT Token 解析
            cityCode = parseFromJwt(request);
        }
  
        if (!StringUtils.hasText(cityCode)) {
            // 再兜底:使用默认值
            cityCode = TenantContextHolder.DEFAULT_CITY;
            log.warn("请求未携带 cityCode,使用默认值: {}", cityCode);
        }
  
        TenantContextHolder.setCityCode(cityCode);
  
        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
  
    private String parseFromJwt(ServletRequest request) {
        // 解析 JWT Token 的逻辑(迭代4会详细讲)
        // ...
    }
}

3.6 压测验证 #

测试场景:

  • 网关 -> 服务A -> 服务B -> 服务C(三级调用)
  • 并发 100 用户,持续 10 分钟
  • 随机使用 10 个不同的 cityCode

结果:

总请求数: 120000
成功率: 100%
cityCode 传递准确率: 100%
平均响应时间: 250ms
P99 响应时间: 580ms

抓包验证 Feign 请求:

GET /internal/jobs HTTP/1.1
Host: downstream-service:8081
x-city-code: 150200          ← Header 正确携带
Content-Type: application/json

3.7 迭代 2 小结 #

解决了什么:

  • ✅ 跨服务调用时 cityCode 能正确传递
  • ✅ 支持多级服务调用链路(A -> B -> C)
  • ✅ 兼容同步和异步场景

技术选型总结:

  • ✅ 入站用 Filter - 执行最早,生命周期完整
  • ✅ 出站用 Feign Interceptor - 拦截 HTTP 客户端调用
  • ✅ 不混用,职责清晰

遗留问题:

  • ❌ AOP 解析 cityCode 性能开销
  • ❌ 每个服务都要解析一遍 Token
  • ❌ ThreadLocal 不清理可能导致上下文串数据

第四章:迭代 3 - ThreadLocal 清理与上下文串数据 #

4.1 线上事故:数据串库了! #

上线两周后,生产环境突然出现一个诡异的 Bug:用户 A 查询到了用户 B 的数据!

排查日志发现:

[Thread-45] 用户A请求 cityCode=150100 (呼和浩特)
[Thread-45] 查询完成,返回数据
[Thread-45] 用户B请求 cityCode=150200 (包头)  <-- 复用了同一个线程
[Thread-45] cityCode 仍然是 150100!  <-- 上一次的值没清理
[Thread-45] 查询错误的数据库!

4.2 问题分析:Tomcat 线程池复用 #

先看 Tomcat 的线程模型:

ThreadLocal 的生命周期:

关键点:

  • Tomcat 线程池默认 maxThreads=200,线程会复用
  • 如果不调用 clear(),ThreadLocal 的值会一直保留
  • 下一个请求如果也使用同一个线程,可能拿到上一个请求的值

4.3 ThreadLocal 内存泄漏的真相 #

网上很多文章说"不调用 clear() 会导致内存泄漏,最终 OOM",这个说法过于绝对

实际情况:

  1. 对象数受线程池上限约束

    • Tomcat 线程池默认 200 个线程
    • 即使不 clear(),最多也就 200 个 String 对象(cityCode)
    • 单个 String 假设 100 字节,总共才 20KB
    • 不会导致 OOM
  2. 真正的内存泄漏场景

ThreadLocalMap 的 Entry 继承了 WeakReference:

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value; // 强引用!
  
    Entry(ThreadLocal<?> k, Object v) {
        super(k); // ThreadLocal 是弱引用
        value = v; // value 是强引用
    }
}

真正泄漏的情况:

  • ThreadLocal 实例被 GC 回收(弱引用)
  • 但 value 仍然被 Entry 强引用
  • 形成"僵尸 Entry":key 为 null,但 value 还在
  • 只有当 value 本身很大,或者有大量这种僵尸 Entry 时才会内存泄漏

ThreadLocal 的自清理机制:

// ThreadLocal.get() 源码
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = t.threadLocals;
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            return (T)e.value;
        }
    }
    return setInitialValue();
}

// ThreadLocalMap.getEntry() 会清理僵尸 Entry
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key) {
        return e;
    } else {
        // 清理 key 为 null 的僵尸 Entry
        return getEntryAfterMiss(key, i, e);
    }
}

4.4 正确的清理方式 #

方案:Filter 统一管理

@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class TenantContextFilter implements Filter {
  
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        String cityCode = httpRequest.getHeader("x-city-code");
  
        // 设置上下文
        TenantContextHolder.setCityCode(cityCode);
  
        try {
            // 继续执行后续逻辑
            chain.doFilter(request, response);
        } finally {
            // 关键!无论成功还是异常,都要清理
            TenantContextHolder.clear();
        }
    }
}

为什么必须放在 finally?

public void someService() {
    // 业务逻辑
    doSomething(); // 这里抛异常了!
  
    // 如果不在 finally 中,这行不会执行
    TenantContextHolder.clear();
}

Filter vs AOP 的执行顺序:

Filter 最早设置、最晚清理,能覆盖整个请求生命周期。

4.5 TTL 还需要手动 clear 吗? #

答案:仍然需要!

TTL 解决的是父子线程传递问题,但不负责清理主线程的上下文。

@Component
public class TenantContextFilter implements Filter {
  
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
  
        String cityCode = extractCityCode(request);
        TenantContextHolder.setCityCode(cityCode);
  
        try {
            chain.doFilter(request, response);
            // 主线程可能启动了异步任务,TTL 会自动传递给子线程
        } finally {
            // 但主线程的 TTL 仍然需要清理!
            // 否则 Tomcat 线程复用时会泄漏
            TenantContextHolder.clear();
        }
    }
}

完整的生命周期:

主线程和子线程各自负责清理自己的上下文:

  • 子线程由 Restore 自动清理
  • 主线程由 Filter 的 finally 清理

4.6 迭代 3 小结 #

解决了什么:

  • ✅ 修复了线程复用导致的上下文串数据问题
  • ✅ 通过 Filter 统一管理上下文生命周期
  • ✅ 确保异常情况下也能正确清理

澄清误区:

  • ⚠️ 不 clear() 不一定会 OOM,但会导致数据串库
  • ⚠️ ThreadLocal 有自清理机制,但不能依赖它
  • ⚠️ TTL 的 Restore 只清理子线程,主线程仍需手动清理

遗留问题:

  • ❌ AOP 解析 cityCode 性能开销
  • ❌ 每个服务都要解析一遍 Token

第五章:迭代 4 - 性能优化之网关统一解析 #

5.1 性能瓶颈发现 #

上线一个月后,运维监控发现一个奇怪的现象:CPU 使用率持续在 70% 左右,但业务量并不大

通过 Arthas 火焰图分析,发现热点方法:

com.auth0.jwt.JWT.decode()  ────────────── 28.5%
com.example.aop.DataSourceAspect.around() ─ 18.2%

JWT 解析竟然占了近 30% 的 CPU!

5.2 问题根源分析 #

原始架构的性能问题:

每个服务都在做重复劳动:

  1. AOP 切面拦截: 每次进入 Service 方法都触发
  2. 解析 JWT Token: 从 Header 取出 Token,验证签名,解析 Payload
  3. 提取 cityCode: 从 Claims 中读取自定义字段

一个三级调用链路:解析 4 次 JWT!

JWT 解析性能分析:

// JWT 解析示例代码
public String parseCityCodeFromToken(String token) {
    try {
        // 1. 解码 Base64(轻量)
        String[] parts = token.split("\\.");
        String payload = new String(Base64.getUrlDecoder().decode(parts[1]));
  
        // 2. JSON 反序列化(中等开销)
        Map<String, Object> claims = JSON.parseObject(payload);
  
        // 3. 验证签名(重!)
        Algorithm algorithm = Algorithm.HMAC256(secret);
        JWTVerifier verifier = JWT.require(algorithm).build();
        verifier.verify(token); // 这一步最耗时!
  
        // 4. 提取 cityCode
        return (String) claims.get("cityCode");
    } catch (Exception e) {
        log.error("JWT 解析失败", e);
        return null;
    }
}

性能测试:

@Test
public void testJwtParsePerformance() {
    String token = generateTestToken();
  
    long start = System.nanoTime();
    for (int i = 0; i < 10000; i++) {
        parseCityCodeFromToken(token);
    }
    long end = System.nanoTime();
  
    System.out.println("1万次解析耗时: " + (end - start) / 1_000_000 + "ms");
}

// 结果: 1万次解析耗时: 3200ms
// 平均每次: 0.32ms

看起来单次才 0.32ms,不多啊?

但考虑到:

  • 每个请求可能调用 10+ 个 Service 方法(都被 AOP 拦截)
  • 每个 Service 方法都解析一次
  • 一个三级调用链路: 10次 × 3个服务 = 30次解析
  • 高峰期 1000 QPS: 1000 × 30 × 0.32ms = 9.6秒 CPU 时间/秒

CPU 利用率高就不奇怪了!

5.3 解决方案:网关统一解析 #

优化思路:

  1. 网关解析一次: 在网关层解析 JWT,提取 cityCode
  2. 写入 Header: 把 cityCode 作为明文 Header 传递给下游
  3. 下游直接读取: 服务只需要从 Header 读取,不再解析 Token
  4. 去掉 AOP: 在 Filter 层统一处理,不需要每个方法都拦截

架构改造:

5.4 网关改造(Spring Cloud Gateway 示例) #

网关全局过滤器:

@Component
public class JwtParseFilter implements GlobalFilter, Ordered {
  
    private static final String JWT_HEADER = "Authorization";
    private static final String BEARER_PREFIX = "Bearer ";
  
    @Value("${jwt.secret}")
    private String jwtSecret;
  
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
  
        // 1. 从 Header 提取 JWT Token
        String authHeader = request.getHeaders().getFirst(JWT_HEADER);
        if (authHeader == null || !authHeader.startsWith(BEARER_PREFIX)) {
            return chain.filter(exchange);
        }
  
        String token = authHeader.substring(BEARER_PREFIX.length());
  
        try {
            // 2. 解析 JWT(只在网关解析一次!)
            Algorithm algorithm = Algorithm.HMAC256(jwtSecret);
            JWTVerifier verifier = JWT.require(algorithm).build();
            DecodedJWT jwt = verifier.verify(token);
  
            // 3. 提取业务字段
            String cityCode = jwt.getClaim("cityCode").asString();
            String userId = jwt.getClaim("userId").asString();
            String userName = jwt.getClaim("userName").asString();
  
            // 4. 写入自定义 Header,传递给下游服务
            ServerHttpRequest mutatedRequest = request.mutate()
                .header("x-city-code", cityCode)      // 租户标识
                .header("x-user-id", userId)          // 用户ID
                .header("x-user-name", userName)      // 用户名
                .build();
  
            log.info("网关解析JWT成功: cityCode={}, userId={}", cityCode, userId);
  
            // 5. 替换原始请求
            ServerWebExchange mutatedExchange = exchange.mutate()
                .request(mutatedRequest)
                .build();
  
            return chain.filter(mutatedExchange);
  
        } catch (JWTVerificationException e) {
            log.error("JWT 验证失败: {}", e.getMessage());
            // Token 无效,返回 401
            ServerHttpResponse response = exchange.getResponse();
            response.setStatusCode(HttpStatus.UNAUTHORIZED);
            return response.setComplete();
        }
    }
  
    @Override
    public int getOrder() {
        // 优先级设置为最高,确保在其他过滤器之前执行
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

网关配置:

spring:
  cloud:
    gateway:
      routes:
        - id: service-a
          uri: lb://service-a
          predicates:
            - Path=/api/a/**
          filters:
            # 去掉原始 JWT Token,避免下游误用
            - RemoveRequestHeader=Authorization

5.5 下游服务简化 #

去掉 AOP 切面:

// 删除这个类!
@Aspect
@Component
public class DataSourceAspect {
    @Around("execution(* com.example.service..*.*(..))")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        // 不再需要这个重量级拦截
    }
}

Filter 直接读取 Header(代码不变):

@Component
public class TenantContextFilter implements Filter {
  
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
  
        HttpServletRequest httpRequest = (HttpServletRequest) request;
  
        // 直接从 Header 读取网关解析好的 cityCode(无需解析 JWT!)
        String cityCode = httpRequest.getHeader("x-city-code");
  
        if (StringUtils.hasText(cityCode)) {
            TenantContextHolder.setCityCode(cityCode);
        }
  
        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
}

优化点:

  1. ❌ 删除了 AOP 切面 - 减少方法拦截开销
  2. ❌ 删除了 JWT 解析逻辑 - 减少 CPU 密集计算
  3. ✅ 只需要简单的 Header 读取 - 几乎零开销

5.6 安全性考量 #

有同学可能会问:把 cityCode 放在明文 Header,不怕被篡改吗?

多层防护:

  1. 内网隔离:微服务通常部署在内网,外部无法直接访问
  2. 网关鉴权:只有通过网关的请求才会带上 x-city-code
  3. 服务间认证:可以加上服务间的签名验证

增强方案(可选):

@Component
public class JwtParseFilter implements GlobalFilter, Ordered {
  
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // ... JWT 解析逻辑 ...
  
        // 生成内部签名,防止 Header 被篡改
        String internalSign = generateInternalSign(cityCode, userId);
  
        ServerHttpRequest mutatedRequest = request.mutate()
            .header("x-city-code", cityCode)
            .header("x-user-id", userId)
            .header("x-internal-sign", internalSign)  // 内部签名
            .build();
  
        return chain.filter(mutatedExchange);
    }
  
    private String generateInternalSign(String cityCode, String userId) {
        // 使用 HMAC 生成签名,密钥只有网关和服务知道
        String data = cityCode + "|" + userId + "|" + System.currentTimeMillis();
        return HmacUtils.hmacSha256Hex(internalSecret, data);
    }
}

下游服务验证签名:

@Component
public class TenantContextFilter implements Filter {
  
    @Value("${internal.secret}")
    private String internalSecret;
  
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
  
        HttpServletRequest httpRequest = (HttpServletRequest) request;
  
        String cityCode = httpRequest.getHeader("x-city-code");
        String userId = httpRequest.getHeader("x-user-id");
        String sign = httpRequest.getHeader("x-internal-sign");
  
        // 验证签名
        if (!verifyInternalSign(cityCode, userId, sign)) {
            ((HttpServletResponse) response).setStatus(HttpStatus.FORBIDDEN.value());
            return;
        }
  
        TenantContextHolder.setCityCode(cityCode);
  
        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
  
    private boolean verifyInternalSign(String cityCode, String userId, String sign) {
        // 验证逻辑(可以加上时间戳防重放)
        // ...
    }
}

5.7 性能对比测试 #

测试场景:

  • 三级调用链路:网关 -> 服务A -> 服务B -> 服务C
  • 每个服务内部调用 10 个 Service 方法
  • 并发 500 用户,持续 5 分钟

优化前(AOP + JWT 解析):

指标
平均响应时间480ms
P95 响应时间850ms
P99 响应时间1200ms
CPU 使用率72%
QPS650

优化后(网关解析 + Header 透传):

指标提升
平均响应时间180ms62.5% ↓
P95 响应时间320ms62.4% ↓
P99 响应时间450ms62.5% ↓
CPU 使用率28%61.1% ↓
QPS78020% ↑

火焰图对比:

优化前:

JWT.decode()           ████████████████ 28.5%
AOP.around()           ██████████ 18.2%
业务逻辑               ████████ 15.3%

优化后:

业务逻辑               ████████████████████ 45.8%
数据库查询             ████████████ 28.3%
序列化/反序列化        ████ 8.9%

5.8 JWT 原理深度解析 #

既然讲到 JWT,就深入聊聊它的原理和性能特性。

JWT 结构:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjaXR5Q29kZSI6IjE1MDIwMCIsInVzZXJJZCI6IjEyMyJ9.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

Header(头部)         Payload(负载)                                  Signature(签名)

分解结构:

为什么验证签名耗时?

// 签名验证过程
public boolean verifySignature(String token, String secret) {
    String[] parts = token.split("\\.");
    String headerAndPayload = parts[0] + "." + parts[1];
    String signature = parts[2];
  
    // 1. 用相同的密钥和算法重新计算签名
    Mac hmac = Mac.getInstance("HmacSHA256");
    SecretKeySpec secretKey = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
    hmac.init(secretKey);
    byte[] calculatedSignature = hmac.doFinal(headerAndPayload.getBytes());
  
    // 2. Base64 编码
    String calculatedSignatureBase64 = Base64.getUrlEncoder()
        .encodeToString(calculatedSignature);
  
    // 3. 对比是否一致
    return calculatedSignatureBase64.equals(signature);
}

性能瓶颈在哪?

  1. HMAC-SHA256 计算:密码学运算,CPU 密集
  2. 每次验证都要重新计算:无法缓存(因为要防重放攻击)
  3. 字符串操作:Split、Base64 编解码

网关解析 vs 每个服务解析:

graph TB
    subgraph 方案一:每个服务都解析
        R1[请求] --> G1[网关<br/>解析JWT]
        G1 --> A1[服务A<br/>解析JWT]
        A1 --> B1[服务B<br/>解析JWT]
        B1 --> C1[服务C<br/>解析JWT]
  
        G1 -.0.3ms.-> G1
        A1 -.0.3ms.-> A1
        B1 -.0.3ms.-> B1
        C1 -.0.3ms.-> C1
    end
  
    subgraph 方案二:网关统一解析
        R2[请求] --> G2[网关<br/>解析JWT]
        G2 --> A2[服务A<br/>读Header]
        A2 --> B2[服务B<br/>读Header]
        B2 --> C2[服务C<br/>读Header]
  
        G2 -.0.3ms.-> G2
        A2 -.0.001ms.-> A2
        B2 -.0.001ms.-> B2
        C2 -.0.001ms.-> C2
    end
  
    style G1 fill:#f8d7da
    style A1 fill:#f8d7da
    style B1 fill:#f8d7da
    style C1 fill:#f8d7da
    style G2 fill:#d4edda

总耗时对比:

  • 方案一: 0.3ms × 4 = 1.2ms
  • 方案二: 0.3ms + 0.001ms × 3 = 0.303ms

单个请求节省 0.9ms,看起来不多,但:

  • 1000 QPS → 每秒节省 900ms CPU 时间
  • 相当于多出 90% 的 CPU 性能!

5.9 其他性能优化技巧 #

技巧 1: 网关层缓存解析结果

如果同一个 Token 短时间内多次请求,可以缓存解析结果:

@Component
public class JwtParseFilter implements GlobalFilter, Ordered {
  
    // 使用 Caffeine 本地缓存
    private final Cache<String, JwtClaims> jwtCache = Caffeine.newBuilder()
        .maximumSize(10000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();
  
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String token = extractToken(exchange.getRequest());
  
        // 先查缓存
        JwtClaims claims = jwtCache.getIfPresent(token);
        if (claims == null) {
            // 缓存未命中,解析 JWT
            claims = parseAndVerifyJwt(token);
            jwtCache.put(token, claims);
        }
  
        // 写入 Header
        // ...
    }
}

注意:缓存时间不宜过长,要考虑 Token 过期和撤销的场景。

技巧 2: 使用非对称加密算法(RS256)

HMAC-SHA256 是对称加密,所有服务都要持有密钥,有泄漏风险。

可以改用 RSA 非对称加密:

  • 网关用私钥签名
  • 服务用公钥验证(公钥泄漏无所谓)
// 网关签名(私钥)
Algorithm algorithm = Algorithm.RSA256(publicKey, privateKey);
String token = JWT.create()
    .withClaim("cityCode", "150200")
    .sign(algorithm);

// 服务验证(只需要公钥)
Algorithm algorithm = Algorithm.RSA256(publicKey, null);
JWTVerifier verifier = JWT.require(algorithm).build();
verifier.verify(token);

但 RSA 验证比 HMAC 慢 10 倍,所以更要在网关统一解析!

技巧 3: 精简 Payload

JWT 的每个字段都会被 Base64 编码传输,字段越多,Token 越大:

// 臃肿的 Payload(不推荐)
{
  "userId": "123456789",
  "userName": "张三",
  "email": "zhangsan@example.com",
  "roles": ["admin", "user", "guest"],
  "permissions": ["read", "write", "delete"],
  "cityCode": "150200",
  "cityName": "包头市",
  "companyId": "comp_001",
  "companyName": "xxx科技有限公司",
  "departmentId": "dept_005",
  "departmentName": "研发部"
}

// 精简的 Payload(推荐)
{
  "uid": "123456789",       // userId 缩写
  "cid": "150200",          // cityCode 缩写
  "rol": ["adm", "usr"]     // roles 缩写
}

对比:

  • 臃肿版: 360 字节
  • 精简版: 85 字节
  • 节省 76% 的网络传输!

5.10 迭代 4 小结 #

解决了什么:

  • ✅ 响应时间降低 62%
  • ✅ CPU 使用率降低 61%
  • ✅ QPS 提升 20%
  • ✅ 去掉了 AOP 切面的代码侵入
  • ✅ 简化了服务间的调用逻辑

架构优化总结:

层次优化前优化后收益
网关不解析 JWT解析 JWT 并写入 Header集中处理
服务AOP + JWT 解析Filter 读 Header减少 90% 开销
调用链每个服务解析只解析一次减少重复计算

关键设计原则:

  1. 网关集中处理:认证、鉴权、解析都在边界完成
  2. 内网传明文:牺牲一点安全性,换取大幅性能提升
  3. 去中心化存储:上下文用 TTL 存储,不依赖 Redis 等中间件

第六章:总结与最佳实践 #

6.1 完整架构总览 #

经过四次迭代,最终的架构图:

数据流转图:

6.2 核心组件清单 #

1. 数据源配置

@Configuration
public class DataSourceConfig {
  
    @Bean
    public DataSource routingDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        // 为每个租户配置独立数据源
        targetDataSources.put("150100", buildDruidDataSource(city150100Props));
        targetDataSources.put("150200", buildDruidDataSource(city150200Props));
  
        DynamicRoutingDataSource ds = new DynamicRoutingDataSource();
        ds.setTargetDataSources(targetDataSources);
        ds.setDefaultTargetDataSource(targetDataSources.get("150100"));
        ds.afterPropertiesSet();
        return ds;
    }
  
    private DataSource buildDruidDataSource(DataSourceProperties props) {
        DruidDataSource ds = new DruidDataSource();
        ds.setUrl(props.getUrl());
        ds.setUsername(props.getUsername());
        ds.setPassword(props.getPassword());
        ds.setInitialSize(3);
        ds.setMinIdle(3);
        ds.setMaxActive(20);
        return ds;
    }
}

2. 动态路由数据源

public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return TenantContextHolder.getCityCode();
    }
}

3. TTL 上下文管理

public class TenantContextHolder {
    private static final TransmittableThreadLocal<String> CITY_TTL = 
        new TransmittableThreadLocal<>();
  
    public static void setCityCode(String cityCode) {
        CITY_TTL.set(cityCode);
    }
  
    public static String getCityCode() {
        String code = CITY_TTL.get();
        return code != null ? code : DEFAULT_CITY;
    }
  
    public static void clear() {
        CITY_TTL.remove();
    }
}

4. Filter 入口拦截

@Component
public class TenantContextFilter implements Filter {
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
        String cityCode = ((HttpServletRequest) request)
            .getHeader(TenantContextHolder.HEADER_CITY_CODE);
  
        TenantContextHolder.setCityCode(cityCode);
        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
}

5. TTL 线程池配置

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Override
    @Bean("ttlExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(50);
        executor.initialize();
        return TtlExecutors.getTtlExecutor(executor);
    }
}

6. Feign 拦截器

@Configuration
public class FeignContextInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate template) {
        String cityCode = TenantContextHolder.getCityCode();
        if (StringUtils.hasText(cityCode)) {
            template.header(TenantContextHolder.HEADER_CITY_CODE, cityCode);
        }
    }
}

7. 网关 JWT 解析

@Component
public class JwtParseFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String token = extractToken(exchange.getRequest());
        DecodedJWT jwt = JWT.require(Algorithm.HMAC256(secret))
            .build()
            .verify(token);
  
        String cityCode = jwt.getClaim("cityCode").asString();
  
        ServerHttpRequest mutatedRequest = exchange.getRequest().mutate()
            .header("x-city-code", cityCode)
            .build();
  
        return chain.filter(exchange.mutate().request(mutatedRequest).build());
    }
  
    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

6.3 最佳实践 Checklist #

设计阶段:

  • 明确租户隔离维度(租户ID、城市、组织等)
  • 评估租户数量,规划连接池大小
  • 设计数据源配置方式(配置文件 vs 数据库元数据)
  • 确定默认租户策略

开发阶段:

  • 使用 TransmittableThreadLocal 而非 ThreadLocal
  • 所有线程池必须用 TtlExecutors 包装
  • Filter 中 finally 块清理上下文
  • Feign 拦截器注入租户 Header
  • 避免在业务代码中直接操作 ThreadLocal

测试阶段:

  • 单元测试覆盖数据源切换逻辑
  • 压测验证内存泄漏(观察 ThreadLocalMap 大小)
  • 异步场景验证上下文传递
  • 跨服务调用验证 Header 透传
  • 边界测试(无 Header、错误 cityCode 等)

上线阶段:

  • 监控 Druid 连接池指标(活跃连接、等待线程)
  • 监控数据库连接数,避免打满
  • 设置合理的超时时间和熔断策略
  • 日志记录租户标识,便于问题排查

运维阶段:

  • 定期分析慢查询,检查是否有跨库查询
  • 监控 CPU 使用率,JWT 解析开销
  • 关注异常日志中的上下文丢失问题
  • 定期清理无效数据源配置

6.4 常见问题 FAQ #

Q1: ThreadLocal 和 TransmittableThreadLocal 能混用吗?

不建议混用。如果部分代码用 ThreadLocal,部分用 TTL,会导致异步场景下上下文不一致。建议全部统一用 TTL。

Q2: TTL 是否有性能损耗?

有,但很小。Capture/Replay/Restore 三个步骤总耗时约 0.01ms,相比业务逻辑可以忽略。而且带来的并行收益远大于这点损耗。

Q3: 能否把 cityCode 存在 Redis 而非 ThreadLocal?

技术上可以,但不推荐:

  1. 增加网络 IO,每次读取 cityCode 都要调 Redis
  2. 需要生成唯一 Key(如何在异步线程中关联?)
  3. 需要考虑清理策略,避免 Key 泄漏

TTL 是内存操作,性能高且天然隔离。

Q4: 如果租户数量很多(1000+),怎么办?

不建议为每个租户都创建独立数据源(连接池会爆炸)。可以考虑:

  1. 数据源分组: 按地域分组,10 个租户共享一个数据源
  2. Schema 隔离: 同一数据源,不同 Schema
  3. 表名隔离: 同一 Schema,表名加租户后缀(如 job_150100)

Q5: AbstractRoutingDataSource 是否线程安全?

是的。determineCurrentLookupKey() 每次获取连接时都会调用,基于当前线程的 ThreadLocal,天然线程隔离。

Q6: Filter 的 Order 有什么讲究?

建议设置为最高优先级:

@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class TenantContextFilter implements Filter {
    // ...
}

确保在 Spring Security、CORS 等 Filter 之前执行,这样安全认证时就能拿到租户信息。

Q7: 能否在 Controller 的 @RequestMapping 中设置 cityCode?

不推荐。Controller 执行时机太晚,如果有 @Async 方法在 Controller 之前执行,就拿不到上下文了。Filter 是最早的拦截点。

6.5 性能调优建议 #

Druid 连接池调优:

# 多租户场景下的推荐配置
spring.datasource.druid.initial-size=2
spring.datasource.druid.min-idle=2
spring.datasource.druid.max-active=10
spring.datasource.druid.max-wait=60000

# 连接空闲检测
spring.datasource.druid.test-while-idle=true
spring.datasource.druid.time-between-eviction-runs-millis=30000
spring.datasource.druid.min-evictable-idle-time-millis=300000

# 防止连接泄漏
spring.datasource.druid.remove-abandoned=true
spring.datasource.druid.remove-abandoned-timeout=180

# 监控 SQL 性能
spring.datasource.druid.filters=stat,wall,log4j2

数据库连接数规划:

假设:

  • 12 个租户
  • 每个租户连接池 maxActive=10
  • 3 个微服务实例

理论最大连接数: 12 × 10 × 3 = 360

MySQL 默认 max_connections=151,必须调大:

SET GLOBAL max_connections = 500;

并在 my.cnf 中持久化:

[mysqld]
max_connections = 500

监控指标:

指标说明告警阈值
activeCount活跃连接数> 80% maxActive
waitThreadCount等待线程数> 0
notEmptyWaitCount获取连接等待次数持续增长
poolingCount池中空闲连接数< minIdle

Druid 提供了监控页面,访问 /druid/index.html 查看实时指标。

6.6 演进历程回顾 #

迭代问题解决方案关键技术
V0实现基本动态切库AbstractRoutingDataSource + ThreadLocalSpring JDBC
迭代1异步场景上下文丢失TransmittableThreadLocal + TTL 线程池Alibaba TTL
迭代2跨服务调用断链Feign Interceptor + Header 透传Spring Cloud Feign
迭代3ThreadLocal 内存泄漏Filter finally 清理Servlet Filter
迭代4JWT 重复解析性能差网关统一解析 + Header 明文传递Spring Cloud Gateway

核心演进思路:

  1. 问题驱动: 每次迭代都是实际生产问题倒逼
  2. 渐进式优化: 不推翻重来,而是在原有基础上改进
  3. 技术选型务实: 不追求炫技,而是解决问题

6.7 写在最后 #

这个方案从最初的 PoC 到生产稳定运行,经历了大半个月。期间团队踩了无数坑,也积累了很多经验。

几点感悟:

  1. 没有银弹: AbstractRoutingDataSource 不是万能的,大规模租户要考虑分库分表
  2. 性能与安全的平衡: 网关解析 JWT 牺牲了一点安全性,但性能提升 60%,值得
  3. 深入原理很重要: 不理解 ThreadLocal 原理,不可能定位内存泄漏问题
  4. 监控先行: 没有监控,性能问题根本发现不了

后续可以探索的方向:

  1. 动态数据源配置: 从数据库读取租户配置,无需重启即可添加租户
  2. 读写分离: 结合 ShardingSphere 实现租户级别的读写分离
  3. 数据源预热: 启动时预建连接,避免首次查询慢
  4. 智能路由: 根据负载自动切换主备数据源

希望这篇文章能帮到正在做多租户改造的同学。如果有疑问,欢迎交流探讨!


参考资料:

示例代码仓库: