SEMS-on-device-server/scanner.js

151 lines
4.5 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import * as fs from "node:fs";
import * as path from "node:path";
import { Buffer } from "node:buffer";
import { unpack, pack } from "msgpackr";
import * as pako from "pako";
function get_latest_file_path(raw_spectral_data_dir) {
let files = fs.readdirSync(raw_spectral_data_dir);
files = files.sort();
let latest_name = files.pop();
// console.log(latest_name,files.length)
return path.resolve(raw_spectral_data_dir, latest_name);
}
function formatTimestamp(timestamp) {
// 将时间转换为 UTC+8 时区
const offset = 8 * 60 * 60 * 1000; // 8 小时的毫秒数
const beijingTime = new Date(timestamp + offset);
const date = new Date(beijingTime);
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, "0"); // 月份从0开始所以要加1
const day = String(date.getDate()).padStart(2, "0");
const hours = String(date.getHours()).padStart(2, "0");
const minutes = String(date.getMinutes()).padStart(2, "0");
const seconds = String(date.getSeconds()).padStart(2, "0");
const milliseconds = String(date.getMilliseconds()).padStart(3, "0");
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${milliseconds}`;
}
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function main() {
try {
// process.send("child process started");
const raw_spectral_data_dir = "/data";
let inferenceFlag = false;
process.on("message", (msg) => {
console.log("[scanner][主进程消息]: ", msg);
if (msg == "start") {
inferenceFlag = true;
}
if (msg == "stop") {
inferenceFlag = false;
}
// 发送消息给主进程
// process.send({ message: 'Hello from child' });
});
console.log("[scanner][初始化]源数据路径为: ", raw_spectral_data_dir);
let last_data_file = null;
let latest_data_file = null;
let fd_csv = null;
let fd_bin = null;
while (true) {
if (!inferenceFlag) {
await wait(1000);
continue;
}
latest_data_file = get_latest_file_path(raw_spectral_data_dir);
if (latest_data_file != last_data_file) {
console.log("[scanner][数据]数据文件切换为: ", latest_data_file);
fd_csv = fs.openSync(latest_data_file);
fd_bin = fs.openSync(
path.format({
dir: path.dirname(latest_data_file),
name: path.basename(latest_data_file, ".csv"),
ext: "bin",
})
);
last_data_file = latest_data_file;
}
//获取目前的文件末端指针
let last_pointer = fs.statSync(latest_data_file).size;
while (true) {
const stat = fs.statSync(latest_data_file);
//不断循环确认有新的一行数据产生,读取相应的数据
if (stat.size > last_pointer) {
let buffer = Buffer.alloc(stat.size - last_pointer);
fs.readSync(
fd_csv,
buffer,
0,
stat.size - last_pointer,
last_pointer
);
let info = buffer.toString().split(",");
let timestamp = Number(info[0]);
let start_pointer = Number(info[1]);
let length = Number(info[2]);
let spectral_buffer = Buffer.alloc(length);
const bytesRead = fs.readSync(
fd_bin,
spectral_buffer,
0,
length,
start_pointer
);
//bytesRead读取了多少字节数全部直接回传给推理服务
console.log(
"[scanner][输入]",
formatTimestamp(timestamp),
", 输入数据字节数: ",
bytesRead
);
let upload_data = { spectral_data_bin: spectral_buffer };
let upload_data_compressed = pako.gzip(pack(upload_data));
let response = await fetch("http://sems-model-inference:22111/post", {
method: "post",
body: upload_data_compressed,
});
let response_data_compressed = await response.arrayBuffer();
let response_data = unpack(pako.ungzip(response_data_compressed));
process.send(response_data);
console.log(
"[scanner][输出]",
formatTimestamp(timestamp),
", 结果: ",
response_data
);
break;
}
}
}
} catch (err) {
console.error(err.message);
}
}
main().then(() => {
// process.send("child process end");
});