import * as fs from "node:fs"; import * as path from "node:path"; import { Buffer } from "node:buffer"; import { unpack, pack } from "msgpackr"; import * as pako from "pako"; function get_latest_file_path(raw_spectral_data_dir) { let files = fs.readdirSync(raw_spectral_data_dir); files = files.sort(); let latest_name = files.pop(); // console.log(latest_name,files.length) return path.resolve(raw_spectral_data_dir, latest_name); } function formatTimestamp(timestamp) { // 将时间转换为 UTC+8 时区 const offset = 8 * 60 * 60 * 1000; // 8 小时的毫秒数 const beijingTime = new Date(timestamp + offset); const date = new Date(beijingTime); const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, "0"); // 月份从0开始,所以要加1 const day = String(date.getDate()).padStart(2, "0"); const hours = String(date.getHours()).padStart(2, "0"); const minutes = String(date.getMinutes()).padStart(2, "0"); const seconds = String(date.getSeconds()).padStart(2, "0"); const milliseconds = String(date.getMilliseconds()).padStart(3, "0"); return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${milliseconds}`; } function wait(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } async function main() { try { // process.send("child process started"); const raw_spectral_data_dir = "/data"; let inferenceFlag = false; process.on("message", (msg) => { console.log("[scanner][主进程消息]: ", msg); if (msg == "start") { inferenceFlag = true; } if (msg == "stop") { inferenceFlag = false; } // 发送消息给主进程 // process.send({ message: 'Hello from child' }); }); console.log("[scanner][初始化]源数据路径为: ", raw_spectral_data_dir); let last_data_file = null; let latest_data_file = null; let fd_csv = null; let fd_bin = null; while (true) { if (!inferenceFlag) { await wait(1000); continue; } latest_data_file = get_latest_file_path(raw_spectral_data_dir); if (latest_data_file != last_data_file) { console.log("[scanner][数据]数据文件切换为: ", latest_data_file); fd_csv = fs.openSync(latest_data_file); fd_bin = fs.openSync( path.format({ dir: path.dirname(latest_data_file), name: path.basename(latest_data_file, ".csv"), ext: "bin", }) ); last_data_file = latest_data_file; } //获取目前的文件末端指针 let last_pointer = fs.statSync(latest_data_file).size; while (true) { const stat = fs.statSync(latest_data_file); //不断循环确认有新的一行数据产生,读取相应的数据 if (stat.size > last_pointer) { let buffer = Buffer.alloc(stat.size - last_pointer); fs.readSync( fd_csv, buffer, 0, stat.size - last_pointer, last_pointer ); let info = buffer.toString().split(","); let timestamp = Number(info[0]); let start_pointer = Number(info[1]); let length = Number(info[2]); let spectral_buffer = Buffer.alloc(length); const bytesRead = fs.readSync( fd_bin, spectral_buffer, 0, length, start_pointer ); //bytesRead读取了多少字节数,全部直接回传给推理服务 console.log( "[scanner][输入]", formatTimestamp(timestamp), ", 输入数据字节数: ", bytesRead ); let upload_data = { spectral_data_bin: spectral_buffer }; let upload_data_compressed = pako.gzip(pack(upload_data)); let response = await fetch("http://sems-model-inference:22111/post", { method: "post", body: upload_data_compressed, }); let response_data_compressed = await response.arrayBuffer(); let response_data = unpack(pako.ungzip(response_data_compressed)); process.send(response_data); console.log( "[scanner][输出]", formatTimestamp(timestamp), ", 结果: ", response_data ); break; } } } } catch (err) { console.error(err.message); } } main().then(() => { // process.send("child process end"); });