目录
- 引言
- InheritableThreadLocal 的问题
- TransmittableThreadLocal 使用
- 简单应用
- 原理
- 总结
引言
前面在介绍分布式链路追踪时讲过异步调用会丢失链路信息,最终的解决方案是使用对应的包装类重新包装一下,如下:
RunnableWrapper
CallableWrapper
SupplierWrapper
还有openFeign异步请求丢失上文的问题,这些问题追根究底都是ThreadLocal惹得祸。
由于ThreadLocal只能保存当前线程的信息,不能实现父子线程的继承。
说到这,很多人想到了InheritableThreadLocal,确实InheritableThreadLocal能够实现父子线程间传递本地变量,但是.....
但是你的程序如果采用线程池,则存在着线程复用的情况,这时就不一定能够实现父子线程间传递了,因为在线程在线程池中的存在不是每开发者_开发学习次使用都会进行创建,InheritableThreadlocal
是在线程初始化时intertableThreadLocals=true
才会进行拷贝传递。
所以若本次使用的子线程是已经被池化的线程,从线程池中取出线下进行使用,是没有经过初始化的过程,也就不会进行父子线程的本地变量拷贝。
由于在日常应用场景中,绝大多数都是会采用线程池的方式进行资源的有效管理。
今天就来聊一聊阿里的ThansmittableThreadLocal是如何解决线程池中父子线程本地变量传递。
B站链接:https://b23.tv/RI06iZl
InheritableThreadLocal 的问题
在介绍ThansmittableThreadLocal
之前先来看一下InheritableThreadLocal
在线程池中的问题,如下代码:
@Test publicvoidtest()throwsException{ //单一线程池 ExecutorServiceexecutorService=Executors.newSingleThreadExecutor(); //InheritableThreadLocal存储 InheritableThreadLocal<String>username=newInheritableThreadLocal<>(); for(inti=0;i<10;i++){ username.set("公众号:我们—"+i); Thread.sleep(3000); CompletableFuture.runAsync(()->System.out.println(username.get()),executorService); } }
上述代码中创建了一个单一线程池,循环异步调用,打印一下username,由于核心线程数是1,势必存在线程的复用。
打印信息如下:
公众号:我们—0
公众号:我们—0公众号:我们—0公众号:我们—0公众号:我们—0公众号:我们—0公众号:我们—0公众号:我们—0公众号:我们—0公众号:我们—0
看到了吗?这里并没有实现父子线程间的变量传递,这也就是InheritableThreadLocal 的局限性。
TransmittableThreadLocal 使用
TransmittableThreadLocal
(TTL
):在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal
值的传递功能,解决异步执行时上下文传递的问题。
整个TransmittableThreadLocal
库的核心功能(用户API
与框架/中间件的集成API
、线程池ExecutorService
/ForkJoinPool
/TimerTask
及其线程工厂的Wrapper
)。
需求场景:
- 分布式跟踪系统 或 全链路压测(即链路打标)
- 日志收集记录系统上下文
官网地址:https://github.com/alibaba/transmittable-thread-local
下面就以上面的例子改造成TransmittableThreadLocal试一下效果。
首选需要引入对应的依赖,如下:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> </dependency
改造后的代码如下:
@Test publicvoidtest()throwsException{ //单一线程池 ExecutorServiceexecutorService=Executors.newSingleThreadExecutor(); //需要使用TtlExecutors对线程池包装一下 executorService=TtlExecutors.getTtlExecutorService(executorService); //TransmittableThreadLocal创建 TransmittableThreadLocal<String>username=newTransmittableThreadLocal<>(); for(inti=0;i<10;i++){ username.set("公众号:https://github.com/alibaba/transmittable-thread-local—"+i); Thread.sleep(3000); CompletableFuture.runAsync(()->System.out.println(username.get()),executorService); } }
需要注意的是需要使用TtlExecutors
对线程池进行包装,代码如下:
executorService=TtlExecutors.getTtlExecutorService(executorService);
运行效果如下:
公众号:我们—0
公众号:我们—1公众号:我们—2公众号:我们—3公众号:我们—4公众号:我们—5公众号:我们—6公众号:我们—7公众号:我们—8公众号:我们—9
可以看到已经能够实现了线程池中的父子线程的数据传递。
在每次调用任务的时,都会将当前的主线程的TTL数据copy到子线程里面,执行完成后,再清除掉。同时子线程里面的修改回到主线程时其实并没有生效。这样可以保证每次任务执行的时候都是互不干涉。
简单应用
在 Spring Security 往往需要存储用户登录的详细信息,这样在业务方法中能够随时获取用户的信息。
在前面的Spring Cloud Gateway整合OAuth2.0实现统一认证鉴权 文章中笔者是将用户信息直接存储在Request中,这样每次请求都能获取到对应的信息。
其实Request中的信息存储也是通过ThreadLocal完成的,在异步执行的时候还是需要重新转存,这样一来代码就变得复杂。
那么了解了TransmittableThreadLocal 之后,完全可以使用这个存储用户的登录信息,实现如下:
/** *@description使用TransmittableThreadLocal存储用户身份信息LoginVal */ publicclassSecurityContextHolder{ //使用TTL存储身份信息 privatestaticfinalTransmittableThreadLocal<LoginVal>THREAD_LOCAL=newTransmittableThreadLocal<>(); publicstaticvoidset(LoginValloginVal){ THREAD_LOCAL.set(loginVal); } publicstaticLoginValget(){ returnTHREAD_LOCAL.get(); } publicstaticvoidremove(){ THREAD_LOCAL.remove(); } }
由于mvc中的一次请求对应一个线程,因此只需要在拦截器中的设置和移除TransmittableThreadLocal中的信息,代码如下:
/** *@description拦截器,在preHandle中解析请求头的中的token信息,将其放入SecurityContextHolder中 *在afterCompletion方法中移除对应的ThreadLocal中信息 *确保每个请求的用户信息独立 */ @Component publicclassAuthInterceptorimplementsAsyncHandlerInterceptor{ /** *在执行controller方法之前将请求头中的token信息解析出来,放入SecurityContextHolder中(TransmittableThreadLocal) */ @Override publicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler){ if(!(handlerinstanceofHandlerMethod)) returntrue; //获取请求头中的加密的用户信息 Stringtoken=request.getHeader(OAuthConstant.TOKEN_NAME); if(StrUtil.isBlank(token)) returntrue; //解密 Stringjson=Base64.decodeStr(token); //将json解析成LoginVal LoginValloginVal=TokenUtils.parseJsonToLoginVal(json); //封装数据到ThreadLocal中 SecurityContextHolder.set(loginVal); returntrue; } /** *在视图渲染之后执行,意味着一次请求结束,清除TTL中的身份信息 */ @Override publicvoidafterCompletion(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler,Exceptionex){ SecurityContextHolder.remove(); } }
原理
从定义来看,TransimittableThreadLocal
继承于InheritableThreadLocal
,并实现TtlCopier
接口,它里面只有一个copy
方法。所以主要是对InheritableThreadLocal
的扩展。
publicclassTransmittableThreadLocal<T>extendsInheritableThreadLocal<T>implementsTtlCopier<T>
在TransimittableThreadLocal
中添加holder
属性。这个属性的作用就是被标记为具备线程传递资格的对象都会被添加到这个对象中。
要标记一个类,比较容易想到的方式,就是给这个类新增一个Type
字段,还有一个方法就是将具备这种类型的的对象都添加到一个静态全局集合中。之后使用时,这个集合里的所有值都具备这个标记。
//1.holder本身是一个InheritableThreadLocal对象 //2.这个holder对象的value是WeakHashMap<TransmittableThreadLocal<Object>,?> // 2.1 WeekHashMap的value总是null,且不可能被使用。 //2.2WeekHasshMap支持value=null privatestaticInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,?>>holder=newInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,?>>(){ @Override protectedweakHashMap<TransmittableThreadLocal<Object>,?>initialValue(){ returnnewWeakHashMap<TransmittableThreadLocal<Object>,Object>(); } /** *重写了childValue方法,实现上直接将父线程的属性作为子线程的本地变量对象。 */ @Override protectedWeakHashMap<TransmittableThreadLocal<Object>,?>childValue(WeakHashMap<TransmittableThreadLocal<Object>,?>parentValue){ returnnewWeakHashMap<TransmittableThreadLocal<Object>,Object>(parentValue); } };
应用代码是通过TtlExecutors
工具类对线程池对象进行包装。工具类只是简单的判断,输入的线程池是否已经被包装过、非空校验等,然后返回包装类ExecutorServiceTtlWrapper
。根据不同的线程池类型,有不同和的包装类。
@Nullable publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){ if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){ returnexecutorService; } returnnewExecutorServiceTtlWrapper(executorService); }
进入包装类ExecutorServiceTtlWrapper
。可以注意到不论是通过ExecutorServiceTtlWrapper#submit
方法或者是ExecutorTtlWrapper#execute
方法,都会将线程对象包装成TtlCallable
或者TtlRunnable
,用于在真正执行run
方法前做一些业务逻辑。
/** *在ExecutorServiceTtlWrapper实现submit方法 */ @NonNull @Override public<T>Future<T>submit(@NonNullCallable<T>task){ returnexecutorService.submit(TtlCallable.get(task)); } /** *在ExecutorTtlWrapper实现execute方法 */ @Override publicvoidexecute(@NonNullRunnablecommand){ executor.execute(TtlRunnable.get(command)); }
所以,重点的核心逻辑应该是在TtlCallable#call()
或者TtlRunnable#run()
中。以下以TtlCallable
为例,TtlRunnable
同理类似。在分析call()
方法之前,先看一个类Transmitter
publicstaticclassTransmitter{ /** *捕获当前线程中的是所有TransimittableThreadLocal和注册ThreadLocal的值。 */ @NonNull publicstaticObjectcapture(){ returnnewSnapshot(captureTtlValues(),captureThreadLocalValues()); } /** *捕获TransimittableThreadLocal的值,将holder中的所有值都添加到HashMap后返回。 */ privatestaticHashMap<TransmittableThreadLocal<Object>,Object>captureTtlValues(){ HashMap<TransmittableThreadLocal<Object>,Object>ttl2Value= newHashMap<TransmittableThreadLocal<Object>,Object>(); for(TransmittableThreadLocal<Object>threadLocal:holder.get().keySet()){ ttl2Value.put(threadLocal,threadLocal.copyValue()); } returnttl2Value; } /** *捕获注册的ThreadLocal的值,也就是原本线程中的ThreadLocal,可以注册到TTL中,在 *进行线程池本地变量传递时也会被传递。 */ privatestaticHashMap<ThreadLocal<Object>,Object>captureThreadLocalValues(){ finalHashMap<ThreadLocal<Object>,Object>threadLocal2Value= newHashMap<ThreadLocal<Object>,Object>(); for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){ finalThreadLocal<Object>threadLocal=entry.getKey(); finalTtlCopier<Object>copier=entry.getValue(); threadLocal2Value.put(threadLocal,copier.copy(threadLocal.get())); } returnthreadLocal2Value; } /** *将捕获到的本地变量进行替换子线程的本地变量,并且返回子线程现有的本地变量副本backup。 *用于在执行run/call方法之后,将本地变量副本恢复。 */ @NonNull publicstaticObjectreplay(@NonNullObjectcaptured){ finalSnapshotcapturedSnapshot=(Snapshot)captured; returnnewSnapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } /** *替换TransmittableThreadLocal */ @NonNull privatestaticHashMap<TransmittableThreadLocal<Object>,Object>replayTtlValues(@NonNullHashMap<TransmittableThreadLocal<Object>,Object>captured){ //创建副本backup HashMap<TransmittableThreadLocal<Object>,Object>backup= newHashMap<TransmittableThreadLocal<Object>,Object>(); for(finalIterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator();iterator.hasNext();){ TransmittableThreadLocal<Object>threadLocal=iterator.next(); //对当前线程的本地变量进行副本拷贝 backup.put(threadLocal,threadLocal.get()); //若出现调用线程中不存在某个线程变量,而线程池中线程有,则删除线程池中对应的本地变量 if(!captured.containsKey(threadLocal)){ iterator.remove(); threadLocal.superRemove(); } } //将捕获的TTL值打入线程池获取到的线程TTL中。 setTtlValuesTo(captured); //是一个扩展点,调用TTL的beforeExecute方法。默认实现为空 doExecuteCallback(true); returnbackup; } privatestaticHashMap<ThreadLocal<Object>,Object>replayThreadLocalValues(@NonNullHashMap<ThreadLocal<Object>,Object>captured){ finalHashMap<ThreadLocal<Object>,Object>backup= newHashMap<ThreadLocal<Object>,Object>(); for(Map.Entry<ThreadLocal<Object>,Object>entry:captured.entrySet()){ finalThreadLocal<Object>threadLocal=entry.getKey(); backup.put(threadLocal,threadLocal.get()); finalObjectvalue=entry.getValue(); if(value==threadLocalClearMark)threadLocal.remove(); elsethreadLocal.set(value); } returnbackup; } /** *清除单线线程的所有TTL和TL,并返回清除之气的backup */ @NonNull publicstaticObjectclear(){ finalHashMap<TransmittableThreadLocal<Object>,Object>ttl2Value= newHashMap<TransmittableThreadLocal<Object>,Object>(); finalHashMap<ThreadLocal<Object>,Object>threadLocal2Value= newHashMap<ThreadLocal<Object>,Object>(); for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){ finalThreadLocal<Object>threadLocal=entry.getKey(); threadLocal2Value.put(threadLocal,threadLocalClearMark); } returnreplay(newSnapshot(ttl2Value,threadLocal2Value)); } /** *还原 */ publicstaticvoidrestore(@NonNullObjectbackup){ finalSnapshotbackupSnapshot=(Snapshot)backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } privatestaticvoidrestoreTtlValues(@NonNullHashMap<TransmittableThreadLocal<Object>,Object>backup){ //扩展点,调用TTL的afterExecute doExecuteCallback(false); for(finalIterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator();iterator.hasNext();){ TransmittableThreadLocal<Object>threadLocal=iterator.next(); if(!backup.containsKey(threadLocal)){ iterator.remove(); threadLocal.superRemove(); } } //将本地变量恢复成备份版本 setTtlValuesTo(backup); } privatestaticvoidsetTtlValuesTo(@NonNullHashMap<TransmittableThrephpadLocal<Object>,Object>ttlValues){ for(Map.Entry<TransmittableThreadLocal<Object>,Object>entry:ttlValues.entrySet()){ TransmittableThreadLocal<Object>threadLocal=entry.getKey(); threadLocal.set(entry.getValue()); } } privatestaticvoidrestoreThreadLocalValues(@NonNullHashMap<ThreadLocal<Objectwww.devze.com>,Object>backup){ for(Map.Entry<ThreadLocal<Object>,编程客栈Object>entry:backup.entrySet()){ finalThreadLocal<Object>threadLocal=entry.getKey(); threadLocal.set(entry.getValue()); } } /** *快照类,保存TTL和TL */ privatestaticclassSnapshot{ finalHashMap<TransmittableThreadLocal<Objsject>,Object>ttl2Value; finalHashMap<ThreadLocal<Object>,Object>threadLocal2Value; privateSnapshot(HashMap<TransmittableThreadLocal<Object>,Object>ttl2Value, HashMap<ThreadLocal<Object>,Object>threadLocal2Value){ this.ttl2Value=ttl2Value; this.threadLocal2Value=threadLocal2Value; } }
进入TtlCallable#call()
方法。
@Override publicVcall()throwsException{ Objectcaptured=capturedRef.get(); if(captured==null||releaseTtlValueReferenceAfterCall&& !capturedRef.compareAndSet(captured,null)){ thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!"); } //调用replay方法将捕获到的当前线程的本地变量,传递给线程池线程的本地变量, //并且获取到线程池线程覆盖之前的本地变量副本。 Objectbackup=replay(captured); try{ //线程方法调用 returncallable.call(); }finally{ //使用副本进行恢复。 restore(backup); } }
到这基本上线程池方式传递本地变量的核心代码已经大概看完了。总的来说在创建TtlCallable
对象是,调用capture()
方法捕获调用方的本地线程变量,在call()
执行时,将捕获编程客栈到的线程变量,替换到线程池所对应获取到的线程的本地变量中,并且在执行完成之后,将其本地变量恢复到调用之前。
总结
本文介绍了使用阿里开源的TransmittableThreadLocal 优雅的实现父子线程的数据传递,应用场景很多,企业中应用也比较广泛。
精彩评论