您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關實現deno通信的方法,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
通信方式
deno執行代碼和node相似,包含同步和異步的方式, 異步方式通過Promise.then實現。
Typescript/Javascript調用rust
在上一節中講到deno的啟動時會初始化v8 isolate實例,在初始化的過程中,會將c++的函數綁定到v8 isolate的實例上,在v8執行Javascript代碼時,可以像調用Javascript函數一樣調用這些綁定的函數。具體的綁定實現如下:
void InitializeContext(v8::Isolate* isolate, v8::Local<v8::Context> context) { v8::HandleScope handle_scope(isolate); v8::Context::Scope context_scope(context); auto global = context->Global(); auto deno_val = v8::Object::New(isolate); CHECK(global->Set(context, deno::v8_str("libdeno"), deno_val).FromJust()); auto print_tmpl = v8::FunctionTemplate::New(isolate, Print); auto print_val = print_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("print"), print_val).FromJust()); auto recv_tmpl = v8::FunctionTemplate::New(isolate, Recv); auto recv_val = recv_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("recv"), recv_val).FromJust()); auto send_tmpl = v8::FunctionTemplate::New(isolate, Send); auto send_val = send_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("send"), send_val).FromJust()); auto eval_context_tmpl = v8::FunctionTemplate::New(isolate, EvalContext); auto eval_context_val = eval_context_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("evalContext"), eval_context_val) .FromJust()); auto error_to_json_tmpl = v8::FunctionTemplate::New(isolate, ErrorToJSON); auto error_to_json_val = error_to_json_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("errorToJSON"), error_to_json_val) .FromJust()); CHECK(deno_val->SetAccessor(context, deno::v8_str("shared"), Shared) .FromJust()); }
在完成綁定之后,在Typescript中可以通過如下代碼實現c++方法和Typescript方法的映射
libdeno.ts
interface Libdeno { recv(cb: MessageCallback): void; send(control: ArrayBufferView, data?: ArrayBufferView): null | Uint8Array; print(x: string, isErr?: boolean): void; shared: ArrayBuffer; /** Evaluate provided code in the current context. * It differs from eval(...) in that it does not create a new context. * Returns an array: [output, errInfo]. * If an error occurs, `output` becomes null and `errInfo` is non-null. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any evalContext(code: string): [any, EvalErrorInfo | null]; errorToJSON: (e: Error) => string; } export const libdeno = window.libdeno as Libdeno;
在執行Typescript代碼時,只需要引入libdeno,就直接調用c++方法,例如:
import { libdeno } from "./libdeno"; function sendInternal( builder: flatbuffers.Builder, innerType: msg.Any, inner: flatbuffers.Offset, data: undefined | ArrayBufferView, sync = true ): [number, null | Uint8Array] { const cmdId = nextCmdId++; msg.Base.startBase(builder); msg.Base.addInner(builder, inner); msg.Base.addInnerType(builder, innerType); msg.Base.addSync(builder, sync); msg.Base.addCmdId(builder, cmdId); builder.finish(msg.Base.endBase(builder)); const res = libdeno.send(builder.asUint8Array(), data); builder.inUse = false; return [cmdId, res]; }
調用libdeno.send方法可以將數據傳給c++,然后通過c++去調用rust代碼實現具體的工程操作。
同步
在Typescript中只需要設置sendInternal方法的sync參數為true即可,在rust中會根據sync參數去判斷是執行同步或者異步操作,如果sync為true,libdeono.send方法會返回執行的結果,rust和typescript之間傳遞數據需要將數據序列化,這里序列化操作使用的是flatbuffer庫。
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, true);
異步實現
同理,實現異步方式,只需要設置sync參數為false即可,但是異步操作和同步相比,多了回掉方法,在執行異步通信時,libdeno.send方法會返回一個唯一的cmdId標志這次調用操作。同時在異步通信完成后,會創建一個promise對象,將cmdId作為key,promise作為value,加入map中。代碼如下:
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false); util.assert(resBuf == null); const promise = util.createResolvable<msg.Base>(); promiseTable.set(cmdId, promise); return promise;
當在Typescript中調用libdeno.send方法時,調用了C++文件binding.cc中的Send方法,該方法是在deno初始化時綁定到v8 isolate上去的。在Send方法中去調用了ops.rs文件中的dispatch方法,該方法實現了消息到函數的映射。每個類型的消息對應了一種函數,例如讀文件消息對應了讀文件的函數。
pub fn dispatch( isolate: &Isolate, control: libdeno::deno_buf, data: libdeno::deno_buf, ) -> (bool, Box<Op>) { let base = msg::get_root_as_base(&control); let is_sync = base.sync(); let inner_type = base.inner_type(); let cmd_id = base.cmd_id(); let op: Box<Op> = if inner_type == msg::Any::SetTimeout { // SetTimeout is an exceptional op: the global timeout field is part of the // Isolate state (not the IsolateState state) and it must be updated on the // main thread. assert_eq!(is_sync, true); op_set_timeout(isolate, &base, data) } else { // Handle regular ops. let op_creator: OpCreator = match inner_type { msg::Any::Accept => op_accept, msg::Any::Chdir => op_chdir, msg::Any::Chmod => op_chmod, msg::Any::Close => op_close, msg::Any::FetchModuleMetaData => op_fetch_module_meta_data, msg::Any::CopyFile => op_copy_file, msg::Any::Cwd => op_cwd, msg::Any::Dial => op_dial, msg::Any::Environ => op_env, msg::Any::Exit => op_exit, msg::Any::Fetch => op_fetch, msg::Any::FormatError => op_format_error, msg::Any::Listen => op_listen, msg::Any::MakeTempDir => op_make_temp_dir, msg::Any::Metrics => op_metrics, msg::Any::Mkdir => op_mkdir, msg::Any::Open => op_open, msg::Any::ReadDir => op_read_dir, msg::Any::ReadFile => op_read_file, msg::Any::Readlink => op_read_link, msg::Any::Read => op_read, msg::Any::Remove => op_remove, msg::Any::Rename => op_rename, msg::Any::ReplReadline => op_repl_readline, msg::Any::ReplStart => op_repl_start, msg::Any::Resources => op_resources, msg::Any::Run => op_run, msg::Any::RunStatus => op_run_status, msg::Any::SetEnv => op_set_env, msg::Any::Shutdown => op_shutdown, msg::Any::Start => op_start, msg::Any::Stat => op_stat, msg::Any::Symlink => op_symlink, msg::Any::Truncate => op_truncate, msg::Any::WorkerGetMessage => op_worker_get_message, msg::Any::WorkerPostMessage => op_worker_post_message, msg::Any::Write => op_write, msg::Any::WriteFile => op_write_file, msg::Any::Now => op_now, msg::Any::IsTTY => op_is_tty, msg::Any::Seek => op_seek, msg::Any::Permissions => op_permissions, msg::Any::PermissionRevoke => op_revoke_permission, _ => panic!(format!( "Unhandled message {}", msg::enum_name_any(inner_type) )), }; op_creator(&isolate, &base, data) }; // ...省略多余的代碼 }
在每個類型的函數中會根據在Typescript中調用libdeo.send方法時傳入的sync參數值去判斷同步執行還是異步執行。
let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf);
同步執行
在執行dispatch方法后,會返回is_sync的變量,如果is_sync為true,表示該方法是同步執行的,op表示返回的結果。rust代碼會調用c++文件api.cc中的deno_respond方法,將執行結果同步回去,deno_respond方法中根據current_args_的值去判斷是否為同步消息,如果current_args_存在值,則直接返回結果。
異步執行
在deno中,執行異步操作是通過rust的Tokio模塊來實現的,在調用dispatch方法后,如果是異步操作,is_sync的值為false,op不再是執行結果,而是一個執行函數。通過tokio模塊派生一個線程程異步去執行該函數。
let task = op .and_then(move |buf| { let sender = tx; // tx is moved to new thread sender.send((zero_copy_id, buf)).expect("tx.send error"); Ok(()) }).map_err(|_| ()); tokio::spawn(task);
在deno初始化時,會創建一個管道,代碼如下:
let (tx, rx) = mpsc::channel::<(usize, Buf)>();
管道可以實現不同線程之間的通信,由于異步操作是創建了一個新的線程去執行的,所以子線程無法直接和主線程之間通信,需要通過管道的機制去實現。在異步代碼執行完成后,調用tx.send方法將執行結果加入管道里面,event loop會每次從管道里面去讀取結果返回回去。
由于異步操作依賴事件循環,所以先解釋一下deno中的事件循環,其實事件循環很簡單,就是一段循環執行的代碼,當達到條件后,事件循環會結束執行,deno中主要的事件循環代碼實現如下:
pub fn event_loop(&self) -> Result<(), JSError> { // Main thread event loop. while !self.is_idle() { match recv_deadline(&self.rx, self.get_timeout_due()) { Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf), Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), Err(e) => panic!("recv_deadline() failed: {:?}", e), } self.check_promise_errors(); if let Some(err) = self.last_exception() { return Err(err); } } // Check on done self.check_promise_errors(); if let Some(err) = self.last_exception() { return Err(err); } Ok(()) }
self.is_idle方法用來判斷是否所有的異步操作都執行完畢,當所有的異步操作都執行完畢后,停止事件循環,is_idle方法代碼如下:
fn is_idle(&self) -> bool { self.ntasks.get() == 0 && self.get_timeout_due().is_none() }
當產生一次異步方法調用時,會調用下面的方法,使ntasks內部的值加1,
fn ntasks_increment(&self) { assert!(self.ntasks.get() >= 0); self.ntasks.set(self.ntasks.get() + 1); }
在event loop循環中,每次從管道中去取值,這里event loop充消費者,執行異步方法的子線程充當生產者。如果在一次事件循環中,獲取到了一次執行結果,那么會調用ntasks_decrement方法,使ntasks內部的值減1,當ntasks的值為0的時候,事件循環會退出執行。在每次循環中,將管道中取得的值作為參數,調用complete_op方法,將結果返回回去。
在初始化v8實例時,綁定的c++方法中有一個Recv方法,該方法的作用時暴露一個Typescript的函數給rust,在deno的io.ts文件的start方法中執行libdeno.recv(handleAsyncMsgFromRust),將handleAsyncMsgFromRust函數通過c++方法暴露給rust。具體實現如下:
export function start(source?: string): msg.StartRes { libdeno.recv(handleAsyncMsgFromRust); // First we send an empty `Start` message to let the privileged side know we // are ready. The response should be a `StartRes` message containing the CLI // args and other info. const startResMsg = sendStart(); util.setLogDebug(startResMsg.debugFlag(), source); setGlobals(startResMsg.pid(), startResMsg.noColor(), startResMsg.execPath()!); return startResMsg; }
當異步操作執行完成后,可以在rust中直接調用handleAsyncMsgFromRust方法,將結果返回給Typescript。先看一下handleAsyncMsgFromRust方法的實現細節:
export function handleAsyncMsgFromRust(ui8: Uint8Array): void { // If a the buffer is empty, recv() on the native side timed out and we // did not receive a message. if (ui8 && ui8.length) { const bb = new flatbuffers.ByteBuffer(ui8); const base = msg.Base.getRootAsBase(bb); const cmdId = base.cmdId(); const promise = promiseTable.get(cmdId); util.assert(promise != null, `Expecting promise in table. ${cmdId}`); promiseTable.delete(cmdId); const err = errors.maybeError(base); if (err != null) { promise!.reject(err); } else { promise!.resolve(base); } } // Fire timers that have become runnable. fireTimers(); }
從代碼handleAsyncMsgFromRust方法的實現中可以知道,首先通過flatbuffer反序列化返回的結果,然后獲取返回結果的cmdId,根據cmdId獲取之前創建的promise對象,然后調用promise.resolve方法觸發promise.then中的代碼執行。
關于實現deno通信的方法就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。