阿里妹导读
一、背景
适合谁看
Takeaway
什么是 AsyncLocalStorage ?一般什么时候使用它?如何使用它?
没有 AsyncLocalStorage 这个 API 之前的时代是怎么解决异步存储的?大概的原理是什么?
了解广义上的 Async Local Storage 是如何一步一步发展过来的?(即合订本)
AsyncLocalStorage 与最新的阿里巴巴主导的 TC39 提案 AsyncContext 之间是什么关系?
其他语言中类似的方法是怎么用的?
Node 是如何实现的 AsyncHook?
二、开门见山:什么是 AsyncLocalStorage
一个案例引入
第三方服务日志等
方案1:全局变量
// Raw Node.js HTTP server
const http = require('http');
let globalTraceId // 全局变量
// 0. 处理请求的方法
function handleRequest(req, res) {
// 生成唯一 traceId,每次请求进入,复写 globalTraceId 的值
globalTraceId = generateTraceId()
// 检查用户cookie是否有效
cookieValidator().then((res) => {
// 校验成功,返回给用户他需要的内容
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.write('Congrats! Your damn cookie is the best one!');
res.end();
}).catch((err) => {
// 把 traceId 连同 error 上报给异常监控系统
reportError(err, globalTraceId)
// 写状态码500和错误信息等
// ...
});
}
// 1. 创建 server
const server = http.createServer((req, res) => {
handleRequest(req, res)
});
// 2. 让 server 和 port:3000 建立 socket 链接,持续接收端口信息
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
但是在 Node.js 是单线程(主线程是单线程),globalTraceId这样全局变量,在第一个请求异步校验 cookie 的过程中,因为 main stack 已空,所以从backlog里面调入第二个请求进入主线程。
方案2:直接透传参数
const http = require('http');
function handleRequest(req, res) {
const traceId = req.headers['x-trace-id'] || generateTraceId();
// 把 traceId 写入 req 这个 object,将参数一路带下去
req.traceId = traceId;
// 同上
cookieValidator().then((result) => {
// 校验成功,返回给用户他需要的内容
// ...
}).catch((err) => {
// 上报 traceId
reportError(err, req.traceId)
// 写状态码500和错误信息等
// ...
});
}
function cookieValidator() {
return new Promise((resolve, reject) => {
setTimeout(() => {
// do someting
// ...
}, 1000);
});
}
// 此后省略监听等操作
// ...
能够看出来,把 traceId 通过 req 这个 object 一路传下去。能传下去的原因是 node 异步调用的时候,会创建一个新的 context(上下文),把当前调用栈、local variable、referenced global variable 存下来,一直到请求返回再在存下来的 context 中继续执行。
// Via express
const express = require('express');
const { v4: uuidv4 } = require('uuid');
const { reportError } = require('./error-logging');
const app = express();
// 中间件
app.use((req, res, next) => {
const traceId = uuidv4(); // generate a new UUID for the trace ID
req.traceId = traceId; // attach the trace ID to the request object
next();
});
// 设置路由
app.get('/', async (req, res, next) => {
const traceId = req.traceId;
try {
// call an asynchronous function and pass along the trace ID
const result = await someAsyncFunction(traceId);
// do something with the result
res.send(result);
} catch (error) {
// log the error and trace ID to the error logging system
reportError(error, { traceId });
next(error);
}
});
// 监听端口
// ...
Koa.js
const Koa = require('koa');
const { v4: uuidv4 } = require('uuid');
const { reportError } = require('./error-logging');
const app = new Koa();
// 中间件A
app.use(async (ctx, next) => {
const traceId = uuidv4(); // generate a new UUID for the trace ID
ctx.state.traceId = traceId; // store the trace ID in the Koa context object
try {
await next();
} catch (error) {
// log the error and trace ID to the error logging system
reportError(error, { traceId });
throw error;
}
});
// 中间件B,通过 ctx 透传 traceId
app.use(async (ctx) => {
const traceId = ctx.state.traceId;
// call an asynchronous function and pass along the trace ID
const result = await someAsyncFunction(traceId);
// do something with the result
ctx.body = result;
});
// 监听端口
// ...
从上面的代码几乎和 express 一样,也是通过把 tracId 存到一路透传的 ctx 变量里面实现参数的透传。
Nest (NestJS) 是一个用于构建高效、可扩展的 Node.js 服务器端应用程序的开发框架。它利用 JavaScript 的渐进增强的能力,使用并完全支持 TypeScript (仍然允许开发者使用纯 JavaScript 进行开发),并结合了 OOP (面向对象编程)、FP (函数式编程)和 FRP (函数响应式编程)。
// 使用 nestjs-cls这个库
// npm i nestjs-cls
// 模块初始化的时候,申明 Cls Module
({
imports: [
// Register the ClsModule,
ClsModule.forRoot({
middleware: {
// automatically mount the
// ClsMiddleware for all routes
mount: true,
// and use the setup method to
// provide default store values.
setup: (cls, req) => {
// 通过CLS存储 traceId
cls.set('traceId', req.headers['x-trace-id'] || generateTraceId());
},
},
}),
],
providers: [CatService],
controllers: [CatController],
})
export class AppModule {}
// 在 Service 中注册 Cls,并且直接调用
()
export class CatService {
constructor(
// We can inject the provided ClsService instance,
private readonly cls: ClsService,
private readonly catRepository: CatRepository,
) {}
getCatForUser() {
// and use the "get" method to retrieve any stored value.
const userId = this.cls.get('traceId'); // 获得 traceId
return this.catRepository.getForUser(userId);
}
}
上面的代码我们可以看到,Nest 和上面的库肉眼上的不同,是采用了依赖注入的方式进行注册,同时大量使用装饰器的方法。
The nestjs-cls package provides several DX improvements over using plain AsyncLocalStorage (CLS is an abbreviation of the term continuation-local storage).
方案3:今天的角,AsyncLocalStorage
官方文档: This class creates stores that stay coherent through asynchronous operations. While you can create your own implementation on top of the node:async_hooks module, AsyncLocalStorage should be preferred as it is a performant and memory safe implementation that involves significant optimizations that are non-obvious to implement. The following example uses AsyncLocalStorage to build a simple logger that assigns IDs to incoming HTTP requests and includes them in messages logged within each request. 文档地址:https://nodejs.org/api/async_context.html#class-asynclocalstorage
// How to use AsyncLocalStorage in Node.js
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const traceId = asyncLocalStorage.getStore();
console.log(`${traceId}:`, msg);
}
let traceId = 0;
http.createServer((req, res) => {
// 关键的API调用
asyncLocalStorage.run(traceId++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
下面是这段代码的解释:
小结
三、历史合订:在 Node.js 14 之前的 Async Local Storage
忘记历史就意味着背叛(狗头),所以我们来看看历史的合订本。
2013年:CLS横空出世(1.1k Star)
Continuation-local storage works like thread-local storage in threaded programming, but is based on chains of Node-style callbacks instead of threads. The standard Node convention of functions calling functions is very similar to something called "continuation-passing style" in functional programming, and the name comes from the way this module allows you to set and get values that are scoped to the lifetime of these chains of function calls.
CLS 像多线程编程中的独立线程的 storage(TLS: thread local storage)一样工作,只是原理是基于 callback function 而不是线程
取名中有 Continuation 代表 C,是因为类似于函数编程中的 "continuation-passing style" 概念,旨在链式函数调用过程中维护一个持久的数据
你set和get的值,是在这些异步的function的整个生命周期的调用链内的
const express = require('express');
const cls = require('continuation-local-storage'); // require('cls-hooked') 也行,后面会提到
const app = express();
// Create a new namespace for the traceId
const traceNamespace = cls.createNamespace('trace');
// Middleware to set the traceId for each request
app.use((req, res, next) => {
traceNamespace.run(() => {
// Generate a new traceId if one doesn't exist
traceNamespace.set('traceId', generateTraceId());
next();
});
});
// Route to get the traceId for the current request
app.get('/traceId', async (req, res) => {
try {
const cookie = await cookieValidator()
// 校验是否成功等
// ...
} catch(e) {
// 上报 traceId
const traceId = traceNamespace.get('traceId');
reportError(err, traceId)
}
res.send(`Trace ID: ${traceId}`);
});
每次执行 namespace.run(callback) 都会生成一个上下文。语法上,通过 run 方法,包住一个回调函数,在这个回调内可以访问到我们的 Continuation-Local Storage。这个xxx.run(callbakc, ...)的语法之后我们会多次看到。
// load polyfill if native support is unavailable
if (!process.addAsyncListener) require('async-listener');
// createNamespace 就是调用内部的 create 方法
function create(name) {
assert.ok(name, "namespace must be given a name!");
var namespace = new Namespace(name); // 新建 space
namespace.id = process.addAsyncListener({
create : function () { return namespace.active; },
before : function (context, storage) { if (storage) namespace.enter(storage); },
after : function (context, storage) { if (storage) namespace.exit(storage); },
error : function (storage) { if (storage) namespace.exit(storage); }
});
process.namespaces[name] = namespace;
return namespace;
}
在create这个方法中,我们会新建一个 Namespace 来管理所有的方法,此 name 会在原生API上监听各种事件,同时触发我们的 store 变化。其中namespace.enter(storage)表示将此时的 ctx 入栈,在async call before的时候调用,即完成异步时间后、开始执行回调函数之前。而在async call after时,则是调用出栈方法 namespace.exit(storage)。
// cls的实现
// 这是 store 全局变量的 class
function Namespace(name) {
this.name = name;
// changed in 2.7: no default context
this.active = null;
this._set = [];
this.id = null;
}
// run方法
Namespace.prototype.run = function (fn) {
var context = this.createContext();
this.enter(context);
try {
fn(context);
return context;
}
catch (exception) {
if (exception) {
exception[ERROR_SYMBOL] = context;
}
throw exception;
}
finally {
this.exit(context);
}
};
// 当前的 active 入栈,把新的 ctx 当做 this.active
Namespace.prototype.enter = function (context) {
assert.ok(context, "context must be provided for entering");
this._set.push(this.active);
this.active = context;
};
上面的 this._set就是刚才说的被维护的栈的结构。每一次 run 的调用,会创建一个 context 作为 this.active,同时把当前的老的 context(this.active)给 push 进入 this._set 这个栈,等待后续被pop后调用。
2017年:async_hooks
The async_hooks API was released in Node.js 8.x in 2017
const asyncHooks = require('async-hooks')
const asyncHook = asyncHooks.createHook({
init: (asyncId, type, triggerAsyncId, resource) => {},
before: asyncId => {},
after: asyncId => {},
destroy: asyncId => {},
promiseResolve: asyncId => {},
})
asyncHook.enable();
// init() is called during object construction. The resource may not have
// completed construction when this callback runs. Therefore, all fields of the
// resource referenced by "asyncId" may not have been populated.
function init(asyncId, type, triggerAsyncId, resource) { }
// before() is called just before the resource's callback is called. It can be
// called 0-N times for handles (such as TCPWrap), and will be called exactly 1
// time for requests (such as FSReqCallback).
function before(asyncId) { }
// after() is called just after the resource's callback has finished.
function after(asyncId) { }
// destroy() is called when the resource is destroyed.
function destroy(asyncId) { }
2017年:cls-hooked
This is a fork of CLS using AsyncWrap OR async_hooks instead of async-listener. When running Nodejs version < 8, this module uses AsyncWrap which is an unsupported Nodejs API, so please consider the risk before using it. When running Nodejs version >= 8.2.1, this module uses the newer async_hooks API which is considered Experimental by Nodejs.
Stability: 1 - Experimental. Please migrate away from this API, if you can. We do not recommend using the createHook, AsyncHook, and executionAsyncResource APIs as they have usability issues, safety risks, and performance implications. Async context tracking use cases are better served by the stable AsyncLocalStorage API.
Stability: 3 - Legacy
2019年:AsyncLocalStorage(ALS)千呼万唤始出来
AsyncLocalStorage was first introduced in Node.js version 12.0.0, released on April 23, 2019.
小结
没有 AsyncLocalStorage 这个 API 之前的时代是怎么解决异步存储的?大概的原理是什么?
了解广义上的 Async Local Storage 是如何一步一步发展过来的?(即合订本)
四、异枝同根:ALS 与最新 TC39 提案 AsyncContext 的关系
由阿里巴巴 TC39 代表主导的 Async Context 提案 刚在 2023年 2 月初的 TC39 会议中成为了 TC39 Stage 1 提案。提案的目标是定义在 JavaScript 的异步任务中传递数据的方案。
TC39提案: AsyncContext
class AsyncContext<T> {
// 快照当前执行上下文中所有 AsyncContext 实例的值,并返回一个函数。
// 当这个函数执行时,会将 AsyncContext 状态快照恢复为执行上下文的全局状态。
static wrap<R>(fn: (...args: any[]) => R): (...args: any[]) => R;
// 立刻执行 fn,并在 fn 执行期间将 value 设置为当前
// AsyncContext 实例的值。这个值会在 fn 过程中发起的异步操作中被
// 快照(相当于 wrap)。
run<R>(value: T, fn: () => R): R;
// 获取当前 AsyncContext 实例的值。
get(): T;
}
class AsyncLocalStorage<T> {
constructor();
// 立刻执行 callback,并在 callback 执行期间设置异步局部变量值。
run<R>(store: T, callback: (...args: any[]) => R, ...args: any[]): R;
// 获取异步局部变量当前值
getStore(): T;
}
class AsyncResource {
// 快照当前的执行上下文异步局部变量全局状态。
constructor();
// 立刻执行 fn,并在 fn 执行期间将快照恢复为当前执行上下文异步局部变量全局状态。
runInAsyncScope<R>(fn: (...args: any[]) => R, thisArg, ...args: any[]): R;
}
AsyncLocalStorage是Node的API;不是标准,只是一个 runtime 的 API
AsyncContext是EMACScript标准(如果通过);通过后将成为规范,具体实现由各种 runtime 配合 JS Engine 来支持
五、它山之石:其他语言中管理多线程上下文的方法
Java
public class TraceIdFilter implements Filter {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String traceId = httpRequest.getHeader(TRACE_ID_HEADER);
if (traceId == null || traceId.isEmpty()) {
traceId = generateTraceId();
}
TRACE_ID.set(traceId);
try {
chain.doFilter(request, response);
} finally {
TRACE_ID.remove();
}
}
public static String getTraceId() {
return TRACE_ID.get();
}
private String generateTraceId() {
return UUID.randomUUID().toString();
}
// Other methods for initializing and destroying the filter...
}
C++
thread_local int my_thread_local;
void my_thread_function() {
my_thread_local = std::hash<std::thread::id>()(std::this_thread::get_id());
std::cout << "My thread-local value is " << my_thread_local << std::endl;
}
int main() {
std::thread t1(my_thread_function);
std::thread t2(my_thread_function);
t1.join();
t2.join();
return 0;
}
Python
import threading
my_thread_local = threading.local()
def my_thread_function():
my_thread_local.value = threading.get_ident()
print(f"My thread-local value is {my_thread_local.value}")
t1 = threading.Thread(target=my_thread_function)
t2 = threading.Thread(target=my_thread_function)
t1.start()
t2.start()
t1.join()
t2.join()
六、多走一步:AsyncLocalStorage 是如何实现的
因为具体的一些细节会和 Node 的版本有强相关 所以特别声明:下面的文档、代码都以 Node v16.x LTS (Long-term Support) 中的文档和代码为例。
I Wonder Where the API Comes From
让我们一步步来梳理 AsyncLocalStorage API's calling chain
Javascript Zone
// location: lib/async_hooks.js
// 1. 真正的储存位置
const storageList = [];
const storageHook = createHook({
init(asyncId, type, triggerAsyncId, resource) {
const currentResource = executionAsyncResource();
// Value of currentResource is always a non null object
for (let i = 0; i < storageList.length; ++i) {
storageList[i]._propagate(resource, currentResource);
}
}
});
function createHook(fns) {
return new AsyncHook(fns);
}
// 2. ALS Class 的实现
class AsyncLocalStorage {
constructor() {
this.kResourceStore = Symbol('kResourceStore');
this.enabled = false;
}
_enable() {
if (!this.enabled) {
this.enabled = true;
ArrayPrototypePush(storageList, this);
storageHook.enable();
}
}
run(store, callback, ...args) {
// Avoid creation of an AsyncResource if store is already active
if (ObjectIs(store, this.getStore())) {
return ReflectApply(callback, null, args);
}
this._enable();
// 新老 resource 交接班
const resource = executionAsyncResource(); // 新的resource
const oldStore = resource[this.kResourceStore]; // 老的resource
resource[this.kResourceStore] = store; // 新的resource,traceId存放的地方
try {
return ReflectApply(callback, null, args);
} finally {
resource[this.kResourceStore] = oldStore; // 等callback执行结束,将老的oldStore归还
}
}
getStore() {
if (this.enabled) {
const resource = executionAsyncResource();
return resource[this.kResourceStore];
}
}
}
为了便于阅读,上面的代码删去了不必要的部分。
参照下面的API调用代码来看
this._enable(),激活 hook 监听
通过 executionAsyncResource(),获得当前异步资源resource(AsyncResource,每次异步调用,V8都会创建一个对应的AsyncResource)
然后把我们传入的 store 当做resource里kResourceStore对应的值(store就是traceId,kResourceStore就是一个Symbol而已)
然后才执行我们的callback代码ReflectApply(callback, null, args)。其中ReflectApply直接理解为JS中的Function.Apply()。
之后这个 run 方法里面,任何通过executionAsyncResource()得到的值都是我们👆🏻上面的 traceId
最后,我们通过getStore()拿到这个traceId,完美!
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
let traceId = 0;
asyncLocalStorage.run(traceId++, () => {
console.log(asyncLocalStorage.getStore())
setImmediate(() => {
console.log(asyncLocalStorage.getStore())
})
});
asyncLocalStorage.run('test', () => {})
总的来说基于此,我们ALS.run()里面的callback同步请求都可以顺利拿到对应的store,但是异步的请求每次会新建 AsyncResource。所以拿不到上面的 store
另外,这个run方法 里面又是一个类似栈的结构,只不过实现的形式是通过类似于递归调用实现的。 通过这个方法完成了嵌套nest能力 其实从这段代码的 commit log 中也能证实我们的猜想 这个截图里面还有个小彩蛋 Reviewed-By: Zijian Liu 其实,这种递归还有点像 Leetcode 经典的回溯算法题 51. N-Queens,它就是对 tree 的DFS遍历。DFS遍历用递归写是上面的写法,而用迭代写就是用Stack了
七、再走一步:AsyncHook 是如何在 Node Core 层实现的
Intro and Guess
// location: lib/async_hooks.js
class AsyncHook {
constructor({
init,
before,
after,
destroy,
promiseResolve
}) {
this[init_symbol] = init;
this[before_symbol] = before;
this[after_symbol] = after;
this[destroy_symbol] = destroy;
this[promise_resolve_symbol] = promiseResolve;
}
enable() {
// The set of callbacks for a hook should be the same regardless of whether
// enable()/disable() are run during their execution. The following
// references are reassigned to the tmp arrays if a hook is currently being
// processed.
const {
0: hooks_array,
1: hook_fields
} = getHookArrays();
// Each hook is only allowed to be added once.
if (ArrayPrototypeIncludes(hooks_array, this))
return this;
const prev_kTotals = hook_fields[kTotals];
// createHook() has already enforced that the callbacks are all functions,
// so here simply increment the count of whether each callbacks exists or
// not.
hook_fields[kTotals] = hook_fields[kInit] += +!!this[init_symbol];
hook_fields[kTotals] += hook_fields[kBefore] += +!!this[before_symbol];
hook_fields[kTotals] += hook_fields[kAfter] += +!!this[after_symbol];
hook_fields[kTotals] += hook_fields[kDestroy] += +!!this[destroy_symbol];
hook_fields[kTotals] +=
hook_fields[kPromiseResolve] += +!!this[promise_resolve_symbol];
ArrayPrototypePush(hooks_array, this);
if (prev_kTotals === 0 && hook_fields[kTotals] > 0) {
enableHooks();
}
updatePromiseHookMode();
return this;
}
}
还好,构造函数不算可疑,和预想的一样,把每个阶段的 hook 的 callback 存起来。然后再通过 enable 方法激活他们,那 line 44 的 enableHooks() 来自哪里?来自lib/internal/async_hooks.js(是的,自此我们知道原来每一个lib文件夹的API还调用了lib/internal这层内部的实现,简单理解就是又抽象了一层出来。)
// location: lib/internal/async_hooks.js
const async_wrap = internalBinding('async_wrap');
const { setCallbackTrampoline } = async_wrap;
function enableHooks() {
async_hook_fields[kCheck] += 1;
setCallbackTrampoline(callbackTrampoline);
}
// d.ts file
declare function InternalBinding(binding: 'blob'): {
createBlob(sources: Array<Uint8Array | InternalBlobBinding.BlobHandle>, length: number): InternalBlobBinding.BlobHandle;
FixedSizeBlobCopyJob: typeof InternalBlobBinding.FixedSizeBlobCopyJob;
getDataObject(id: string): [handle: InternalBlobBinding.BlobHandle | undefined, length: number, type: string] | undefined;
storeDataObject(id: string, handle: InternalBlobBinding.BlobHandle, size: number, type: string): void;
revokeDataObject(id: string): void;
};
// location: lib/internal/bootstrap/loader.js
// This file creates the internal module & binding loaders used by built-in
// modules. In contrast, user land modules are loaded using
// lib/internal/modules/cjs/loader.js (CommonJS Modules) or
// lib/internal/modules/esm/* (ES Modules).
// C++ binding loaders:
// - internalBinding(): the private internal C++ binding loader, inaccessible
// from user land unless through `require('internal/test/binding')`.
// These C++ bindings are created using NODE_MODULE_CONTEXT_AWARE_INTERNAL()
// and have their nm_flags set to NM_F_INTERNAL.
// This file is compiled as if it's wrapped in a function with arguments
// passed by node::RunBootstrapping()
/* global process, getLinkedBinding, getInternalBinding, primordials */
// Set up internalBinding() in the closure.
/**
* @type {InternalBinding}
*/
let internalBinding;
{
const bindingObj = ObjectCreate(null);
// eslint-disable-next-line no-global-assign
internalBinding = function internalBinding(module) {
let mod = bindingObj[module];
if (typeof mod !== 'object') {
mod = bindingObj[module] = getInternalBinding(module);
ArrayPrototypePush(moduleLoadList, `Internal Binding ${module}`);
}
return mod;
};
}
这是简化后的备注,简单理解就是这个文件被加载为了 loader,既然是 loader 自然要 load 文件,load 什么呢?
// This file is compiled as if it's wrapped in a function with arguments // passed by node::RunBootstrapping()
// location: src/async_wrap.cc
// 该文件结尾处
// 在node_binding.h里面定义了宏macro
// #define NODE_MODULE_CONTEXT_AWARE_INTERNAL(modname, regfunc)
NODE_MODULE_CONTEXT_AWARE_INTERNAL(async_wrap, node::AsyncWrap::Initialize);
NODE_MODULE_EXTERNAL_REFERENCE(async_wrap, node::AsyncWrap::RegisterExternalReferences);
NODE_MODULE_CONTEXT_AWARE_INTERNAL-> NODE_MODULE_CONTEXT_AWARE_CPP -> ...
这里里面有大量句柄(handle),用于处理I/O事件的对象,它负责管理底层I/O资源的分配和释放,以及处理I/O事件的通知和回调。
应该就是调用的 libuv 里这个API了,用作提交异步请求,并且拿到异步的回调。 两个库内部的代码直接互相调用,并不符合规范,他们都被包装到一个内部对外的API进行交互 所以我们 async_wrap <---> libuv 这种关系可以抽象为下面的图👇🏻
AsyncWrap作为基类,提供各种基础API和JS层交互。衍生的子类和 libuv 通过 uv_handle_t 进行交互,由 libuv 通知子类去执行对应的 async hook
How Is AsyncHook.Init() Invoked by Node Core
为了回答上面那个猜想,我想我可以直接介绍下 AsyncHook 的 init 方法是如何被 Node Core 调用的。
// location: lib/internal/async_hooks.js
// Properties in active_hooks are used to keep track of the set of hooks being
// executed in case another hook is enabled/disabled. The new set of hooks is
// then restored once the active set of hooks is finished executing.
const active_hooks = {
// Array of all AsyncHooks that will be iterated whenever an async event
// fires. Using var instead of (preferably const) in order to assign
// active_hooks.tmp_array if a hook is enabled/disabled during hook
// execution.
array: [],
// Use a counter to track nested calls of async hook callbacks and make sure
// the active_hooks.array isn't altered mid execution.
call_depth: 0,
// Use to temporarily store and updated active_hooks.array if the user
// enables or disables a hook while hooks are being processed. If a hook is
// enabled() or disabled() during hook execution then the current set of
// active hooks is duplicated and set equal to active_hooks.tmp_array. Any
// subsequent changes are on the duplicated array. When all hooks have
// completed executing active_hooks.tmp_array is assigned to
// active_hooks.array.
tmp_array: null,
// Keep track of the field counts held in active_hooks.tmp_array. Because the
// async_hook_fields can't be reassigned, store each uint32 in an array that
// is written back to async_hook_fields when active_hooks.array is restored.
tmp_fields: null
};
module.exports = {
executionAsyncId,
triggerAsyncId,
// Private API
getHookArrays,
symbols: {
async_id_symbol, trigger_async_id_symbol,
init_symbol, before_symbol, after_symbol, destroy_symbol,
promise_resolve_symbol, owner_symbol
},
// ..
executionAsyncResource,
// Internal Embedder API
// ...
nativeHooks: {
init: emitInitNative, // <====== 看这里
before: emitBeforeNative,
after: emitAfterNative,
destroy: emitDestroyNative,
promise_resolve: emitPromiseResolveNative
},
};
同时,这个 lib/internal/async_hooks.js 文件export的方法中有个名字比较可疑,emitInitNative。Native, Native,名字里面有 Native,实在太不对劲了。我们来看下实现:
// location: lib/internal/async_hooks.js
// Emit From Native //
// Used by C++ to call all init() callbacks. Because some state can be setup
// from C++ there's no need to perform all the same operations as in
// emitInitScript.
function emitInitNative(asyncId, type, triggerAsyncId, resource) {
active_hooks.call_depth += 1;
resource = lookupPublicResource(resource);
// Use a single try/catch for all hooks to avoid setting up one per iteration.
try {
// Using var here instead of let because "for (var ...)" is faster than let.
// Refs: https://github.com/nodejs/node/pull/30380#issuecomment-552948364
// eslint-disable-next-line no-var
for (var i = 0; i < active_hooks.array.length; i++) {
if (typeof active_hooks.array[i][init_symbol] === 'function') {
active_hooks.array[i][init_symbol](
asyncId, type, triggerAsyncId,
resource
);
}
}
} catch (e) {
fatalError(e);
} finally {
active_hooks.call_depth -= 1;
}
// Hooks can only be restored if there have been no recursive hook calls.
// Also the active hooks do not need to be restored if enable()/disable()
// weren't called during hook execution, in which case active_hooks.tmp_array
// will be null.
if (active_hooks.call_depth === 0 && active_hooks.tmp_array !== null) {
restoreActiveHooks();
}
}
// location: lib/internal/bootstrap.js
const { nativeHooks } = require('internal/async_hooks');
internalBinding('async_wrap').setupHooks(nativeHooks);
// location: src/async_wrap.cc
void AsyncWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
Isolate* isolate = env->isolate();
HandleScope scope(isolate);
env->SetMethod(target, "setupHooks", SetupHooks);
env->SetMethod(target, "setCallbackTrampoline", SetCallbackTrampoline);
env->SetMethod(target, "pushAsyncContext", PushAsyncContext);
env->SetMethod(target, "popAsyncContext", PopAsyncContext);
env->SetMethod(target, "executionAsyncResource", ExecutionAsyncResource);
env->SetMethod(target, "clearAsyncIdStack", ClearAsyncIdStack);
env->SetMethod(target, "queueDestroyAsyncId", QueueDestroyAsyncId);
env->SetMethod(target, "setPromiseHooks", SetPromiseHooks);
env->SetMethod(target, "registerDestroyHook", RegisterDestroyHook);
PropertyAttribute ReadOnlyDontDelete =
static_cast<PropertyAttribute>(ReadOnly | DontDelete);
// ...
}
先解释几个基本概念和数据类型:
Isolate: line 8 中被用到,被定义在 v8.h。Isolate是V8引擎的一个独立实例。它是一个独立的JavaScript运行时,运行在一个单独的线程中,拥有自己的内存堆、垃圾回收器和执行上下文。可以在一个进程中创建多个Isolate,每个Isolate提供一个单独的运行时环境,可以独立地运行JavaScript代码。
Context: line 5 中被用到,被定义在v8.h。Context表示Isolate中的一个执行上下文。它是一个JavaScript对象,包含当前执行上下文的状态,包括变量、函数和其他数据。Context在Isolate中创建,并与特定的执行线程相关联。可以在单个Isolate中创建多个Context,每个Context可以独立地执行JavaScript代码。我们熟知的 vm.createContext()也是创建了一个新的 Context 实例。
Local: lin 5 中被用到,被定义在v8.h。在 V8 引擎(Node.js 的 JavaScript 引擎)中,用于表示 JavaScript 对象的本地句柄(Handle)
看下原文描述:An object reference managed by the v8 garbage collector. All objects returned from v8 have to be tracked by the garbage collector so that it knows that the objects are still alive。
可以理解为类似于指针,但是指向的内存地址会随着GC(garbage collection)而变化,确保总是指向我们需要的值,同时管理引用的对象是否可以被清理。
Local 句柄是一种轻量级的对象引用,它在 V8 的内存管理系统中的生命周期是有限的。当 V8 的垃圾回收器进行内存回收时,Local 句柄所引用的对象可能会被清理。Local<Context>就代表一个V8 Context 的本地句柄。除了本地句柄Local,还有MaybeLocal,Eternal等类型。
line 9 中的 HandleScope 也是用于管理句柄生命周期的。
Environment: line 7 中被用到,被定义在src/env.h。在 Node.js 的 C++ 层面,Environment 类是一个核心组件,负责管理和维护 Node.js 应用程序的上下文环境和资源。它提供了一个桥梁,让 Node.js 的 JavaScript 层与底层的 C++ 实现进行交互。Environment 类封装了许多与 JavaScript 运行时相关的资源。以下是 Environment 类的一些主要职责:
管理 V8 Isolate 实例:Isolate 是 V8 引擎中表示独立的 JavaScript 运行时环境的对象。一个 Environment 实例与一个特定的 Isolate 实例关联,它们共同构成了 Node.js 应用程序的运行时环境。
内存管理:Environment 类负责管理与内存相关的资源,如对象句柄、缓冲区等。通过创建 V8 HandleScope 和 EscapableHandleScope 实例,Environment 能确保 V8 能正确地管理 JavaScript 对象的生命周期。
与 JavaScript 层的互操作:Environment 类提供了一系列方法,使 JavaScript 层与底层的 C++ 实现进行交互。这些方法包括设置 JavaScript 对象的属性和方法、执行回调函数等。
// location: src/async_wrap.cc
static void SetupHooks(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsObject());
// All of init, before, after, destroy, and promise_resolve are supplied by
// async_hooks internally, so this should only ever be called once. At which
// time all the functions should be set. Detect this by checking if
// init !IsEmpty().
CHECK(env->async_hooks_init_function().IsEmpty());
Local<Object> fn_obj = args[0].As<Object>();
#define SET_HOOK_FN(name) \
do { \
Local<Value> v = \
fn_obj->Get(env->context(), \
FIXED_ONE_BYTE_STRING(env->isolate(), #name)) \
.ToLocalChecked(); \
CHECK(v->IsFunction()); \
env->set_async_hooks_##name##_function(v.As<Function>()); \
} while (0)
SET_HOOK_FN(init);
SET_HOOK_FN(before);
SET_HOOK_FN(after);
SET_HOOK_FN(destroy);
SET_HOOK_FN(promise_resolve);
#undef SET_HOOK_FN
}
这里第一步,是获取 Environment* 指针,接着确保 args[0] 是一个 Objext,同时 async_hooks_init_function 是 empty,确保只会被初始化1次。
// location: src/async_wrap.cc
void AsyncWrap::EmitAsyncInit(Environment* env,
Local<Object> object,
Local<String> type,
double async_id,
double trigger_async_id) {
CHECK(!object.IsEmpty());
CHECK(!type.IsEmpty());
AsyncHooks* async_hooks = env->async_hooks();
// Nothing to execute, so can continue normally.
if (async_hooks->fields()[AsyncHooks::kInit] == 0) {
return;
}
HandleScope scope(env->isolate());
Local<Function> init_fn = env->async_hooks_init_function();
Local<Value> argv[] = {
Number::New(env->isolate(), async_id),
type,
Number::New(env->isolate(), trigger_async_id),
object,
};
TryCatchScope try_catch(env, TryCatchScope::CatchMode::kFatal);
USE(init_fn->Call(env->context(), object, arraysize(argv), argv));
}
// location: v8.h
/**
* A JavaScript function object (ECMA-262, 15.3).
*/
class V8_EXPORT Function : public Object {}
set the fn: env -> set_async_hooks_##name##_function() get the corresponding fn: env -> async_hooks_init_function()
在 line 18,我们获得了之前注册的 init,这是一个 Local 句柄,Local<Function> 就是一个指向 js 的 function 的句柄,最后,我们通过 line 28 的 init_fn -> Call() 可以来触发 js 函数。
Sum Up: High-Level Overview Flowchart
API:很好理解,暴露了这3个重要的API
Node Core - Native JS Module:
上面的3个API来自async_hooks.js中的3个类:AsyncLocalStorage/AsyncResource/AsyncHook
AsyncHook负责注册4个阶段的Callback function
在这里通过 internalBinding('async_wrap') 获得C++层的 AsyncWrap
Node Core - C++ Binding:
在 async_wrap.cc 中定义了关键的基类 AsyncWrap,它继承自 BaseObject
通过 NODE_MODULE_CONTEXT_AWARE_INTERNAL 方法暴露给 JS 层
AsyncWrap只是一个基类。UPDWrap、TCPWrap、FSEventWrap等直接或间接继承者它,为各种 Wrap 提供负责触发Hook回调的方法。
比如 TCPWrap -> ConnectionWrap -> LibuvStreamWrap -> HandleWrap -> AsyncWrap
libuv的方法在具体的 Wrap 里面调用。
举个例子,当一个 TCP 网络请求发出时,会执行 new TCPWrap,通过 uv_tcp_connect() 发起链接(方法来自libuv);
链接成功后,会通过一个句柄(uv_tcp_t),对 libuv 保持访问。整个过程中句柄类型会被转变 uv_tcp_t -> uv_stream_t
当请求返回的时候, TCPHandle 对象会触发 uv__stream_io() 方法去执行 uv__read(),最终通知 TCPWrap 或者其父类执行回调
src/api 文件夹中给三方addons提供了一些API,其中AsyncResource是基于AsyncWrap的封装,AsyncWrap触发before和after的异步事件是通过 AsyncWrap::MakeCallback 方法,该方法调用 CallbackScope 内部的 InternalMakeCallback
Deps:
Libuv: 对I/O异步、网络异步回调负责
V8: 对Promise 和 async/await 语法负责
最终通过 AsyncWrap 通知到 JS 层的 AsyncHook
Add-On
这里是一些收集资料过程中发现的相关信息和彩蛋,分享给大家。
eliminate extra lifecycle event
// location: lib/internal/bootstrap/loader.js
// This file creates the internal module & binding loaders used by built-in
// modules. In contrast, user land modules are loaded using
// lib/internal/modules/cjs/loader.js (CommonJS Modules) or
// lib/internal/modules/esm/* (ES Modules).
//
// This file is compiled and run by node.cc before bootstrap/node.js
// was called, therefore the loaders are bootstrapped before we start to
// actually bootstrap Node.js. It creates the following objects:
//
// C++ binding loaders:
// - process.binding(): the legacy C++ binding loader, accessible from user land
// because it is an object attached to the global process object.
// These C++ bindings are created using NODE_BUILTIN_MODULE_CONTEXT_AWARE()
// and have their nm_flags set to NM_F_BUILTIN. We do not make any guarantees
// about the stability of these bindings, but still have to take care of
// compatibility issues caused by them from time to time.
// - process._linkedBinding(): intended to be used by embedders to add
// additional C++ bindings in their applications. These C++ bindings
// can be created using NODE_MODULE_CONTEXT_AWARE_CPP() with the flag
// NM_F_LINKED.
// - internalBinding(): the private internal C++ binding loader, inaccessible
// from user land unless through `require('internal/test/binding')`.
// These C++ bindings are created using NODE_MODULE_CONTEXT_AWARE_INTERNAL()
// and have their nm_flags set to NM_F_INTERNAL.
//
// Internal JavaScript module loader:
// - NativeModule: a minimal module system used to load the JavaScript core
// modules found in lib/**/*.js and deps/**/*.js. All core modules are
// compiled into the node binary via node_javascript.cc generated by js2c.py,
// so they can be loaded faster without the cost of I/O. This class makes the
// lib/internal/*, deps/internal/* modules and internalBinding() available by
// default to core modules, and lets the core modules require itself via
// require('internal/bootstrap/loaders') even when this file is not written in
// CommonJS style.
//
// Other objects:
// - process.moduleLoadList: an array recording the bindings and the modules
// loaded in the process and the order in which they are loaded.
'use strict';
// This file is compiled as if it's wrapped in a function with arguments
// passed by node::RunBootstrapping()
/* global process, getLinkedBinding, getInternalBinding, primordials */
The Workflow of Node.js Startup
图片来源:https://leezhenghui.github.io/node.js/2018/11/11/demystify-node.js-modularity.html
八、最后的最后:打个总结
有奖讨论
程序员写代码为什么要阅读源码?你觉得阅读源码的正确姿势是什么?你在什么场景下才会阅读源码呢?点击阅读原文来阿里云开发者社区分享你的观点吧,参与讨论即有机会获得精美礼品哦~
文章引用微信公众号"阿里开发者",如有侵权,请联系管理员删除!