From 5f3f06f676b6b32389847278fd7dda9938e85be4 Mon Sep 17 00:00:00 2001 From: Zhanpeng Yang Date: Thu, 26 Sep 2024 18:21:32 +0800 Subject: [PATCH] [features]api/healthcheck and /api upload [fixed] timezone nodemon --- CHANGELOG.md | 16 +++ Dockerfile | 4 +- nodemon.json | 4 +- run.ps1 | 6 +- scanner.js | 214 +++++++++++++++++++---------- server.js | 41 +++--- src/app/api/healthcheck/route.js | 14 ++ src/app/api/upload/route.js | 192 ++++++++++++++++++++++++++ src/app/upload/page.js | 226 +++++++++++++++++++++++++++++++ 9 files changed, 614 insertions(+), 103 deletions(-) create mode 100644 src/app/api/healthcheck/route.js create mode 100644 src/app/api/upload/route.js create mode 100644 src/app/upload/page.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 145f20d..72384ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,20 @@ # 更新日志 + +## v0.0.1.20240926_alpha +### ⭐️Features +- 当建立websocket连接时,才持续读取最新光谱数据并进行推理,连接断开则停止 +- 新建接口`api/healthcheck`便于后续监视程序运行情况 +- [暂未完成]新建接口`api/upload`根据前端输入数据挑选光谱数据,并进行推理计算,并将原始数据与误差等信息回传到云服务器。 +### 🐞Fixed +- 修改容器内时区为Asia/Shanghai,即在DockerFile中添加环境变量TZ=Asia/Shanghai。否则将导致本地时间与 容器内时间不一致 +- 修复windows下修改文件,容器内的nodemon无法监测改动,即添加nodemon.json中 "legacyWatch": true。并将build等文件夹列入不监视对象 + + +### 🚀Refactored +- 直接运行`./run.ps1`即默认启动开发环境 +- 在scanner中新增一些日志 + + ## v0.0.1.20240925_alpha ### 🚀Refactored - 修改next.config.mjs内的output为standalone模式,便于后面生产部署 diff --git a/Dockerfile b/Dockerfile index 40aa080..ad21716 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,7 @@ FROM node:20.17.0-alpine3.19 +ENV TZ=Asia/Shanghai + WORKDIR /env COPY package.json ./ RUN npm config set registry https://registry.npmmirror.com &&\ @@ -10,7 +12,7 @@ RUN npm config set registry https://registry.npmmirror.com &&\ WORKDIR /app # 指定容器创建时的默认命令。(可以被覆盖) CMD ln -snf /env/node_modules /app &&\ - npm run start + npm run start diff --git a/nodemon.json b/nodemon.json index 96e1682..8fec51c 100644 --- a/nodemon.json +++ b/nodemon.json @@ -2,7 +2,8 @@ "restartable": "rs", "ignore": [ ".git", - "node_modules/**/node_modules" + "node_modules", + "build" ], "verbose": true, "exec": "node server.js", @@ -12,5 +13,6 @@ "env": { "NODE_ENV": "development" }, + "legacyWatch": true, "ext": "js,json" } \ No newline at end of file diff --git a/run.ps1 b/run.ps1 index d953a2f..7285e70 100644 --- a/run.ps1 +++ b/run.ps1 @@ -5,8 +5,8 @@ If($cmd -eq "build_docker"){ } -If($cmd -eq "dev"){ - docker run --rm -p "22110:22110" -v "C:\SEMS-development\SEMS-on-device-server:/app" -v "C:\tmp:/data" --name sems-on-device-server sems-on-device-server:latest sh -c "ln -snf /env/node_modules /app && npm run dev" +If($cmd -eq "dev" -or [String]::IsNullOrEmpty($cmd) ){ + docker run --rm -p "22110:22110" -v "C:\SEMS-development\SEMS-on-device-server:/app" -v "C:\tmp:/data" --name sems-on-device-server --link sems-model-inference sems-on-device-server:latest sh -c "ln -snf /env/node_modules /app && npm run dev" } @@ -42,7 +42,7 @@ If($cmd -eq "release"){ #复制static文件夹 Copy-Item -Path .next/static -Destination .next/standalone/.next -Recurse -Force #把.next/standalone文件夹下的所有文件打包,但是除了这个目录下的node_modules文件,因为他是Linux系统下的软连接,会报错。 - Get-ChildItem -Path ".next/standalone/" -Recurse | Where-Object { $_.Name -ne "node_modules" } | Compress-Archive -DestinationPath "./build/SEMS-on-device-server-$version.zip" -Force + Get-ChildItem -Path ".next/standalone/"| Where-Object { $_.Name -ne "node_modules" } | Compress-Archive -DestinationPath "./build/SEMS-on-device-server-$version.zip" -Force Write-Host "Released /build/SEMS-on-device-server-$version.zip" diff --git a/scanner.js b/scanner.js index c921cfc..fcdef33 100644 --- a/scanner.js +++ b/scanner.js @@ -1,86 +1,150 @@ -import * as fs from "node:fs" -import * as path from "node:path" -import { Buffer } from 'node:buffer'; -import { unpack, pack } from 'msgpackr'; -import * as pako from 'pako'; +import * as fs from "node:fs"; +import * as path from "node:path"; +import { Buffer } from "node:buffer"; +import { unpack, pack } from "msgpackr"; +import * as pako from "pako"; +function get_latest_file_path(raw_spectral_data_dir) { + let files = fs.readdirSync(raw_spectral_data_dir); + files = files.sort(); + let latest_name = files.pop(); + // console.log(latest_name,files.length) + return path.resolve(raw_spectral_data_dir, latest_name); +} +function formatTimestamp(timestamp) { + // 将时间转换为 UTC+8 时区 + const offset = 8 * 60 * 60 * 1000; // 8 小时的毫秒数 + const beijingTime = new Date(timestamp + offset); -function get_latest_file_path(raw_spectral_data_dir){ - - let files=fs.readdirSync(raw_spectral_data_dir) - files=files.sort() - let latest_name=files.pop() - // console.log(latest_name,files.length) - return path.resolve(raw_spectral_data_dir,latest_name) + const date = new Date(beijingTime); + const year = date.getFullYear(); + const month = String(date.getMonth() + 1).padStart(2, "0"); // 月份从0开始,所以要加1 + const day = String(date.getDate()).padStart(2, "0"); + const hours = String(date.getHours()).padStart(2, "0"); + const minutes = String(date.getMinutes()).padStart(2, "0"); + const seconds = String(date.getSeconds()).padStart(2, "0"); + const milliseconds = String(date.getMilliseconds()).padStart(3, "0"); + + return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${milliseconds}`; } - -async function main(){ - try { - // process.send("child process started"); - const raw_spectral_data_dir="/data" - - let last_data_file=null - let latest_data_file=null - let fd_csv=null - let fd_bin=null - while(true){ - latest_data_file=get_latest_file_path(raw_spectral_data_dir) - - if (latest_data_file!=last_data_file){ - - fd_csv=fs.openSync(latest_data_file) - - fd_bin=fs.openSync(path.format({ - dir: path.dirname(latest_data_file), - name: path.basename(latest_data_file,".csv"), - ext: 'bin', - })) - last_data_file=latest_data_file - } - - let last_pointer=fs.statSync(latest_data_file).size - - while(true){ - const stat=fs.statSync(latest_data_file) - if(stat.size>last_pointer){ - let buffer=Buffer.alloc(stat.size-last_pointer) - fs.readSync(fd_csv,buffer,0,stat.size-last_pointer,last_pointer) - let info=buffer.toString().split(",") - let timeStamp=Number(info[0]) - let start_pointer=Number(info[1]) - let length=Number(info[2]) - let spectral_buffer=Buffer.alloc(length) - const spectral_data=fs.readSync(fd_bin,spectral_buffer,0,length,start_pointer) - - - let upload_data={"spectral_data_bin":spectral_buffer } - let upload_data_compressed = pako.gzip(pack(upload_data)) - - let response=await fetch("http://sems-model-inference:22111/post", { - method: "post", - body: upload_data_compressed - }) - let response_data_compressed= await response.arrayBuffer() - let response_data=unpack(pako.ungzip(response_data_compressed)) - // console.log(response_data) - - process.send(response_data); - - break - } - } - +function wait(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); } - }catch (err) { - console.error(err.message); + +async function main() { + try { + // process.send("child process started"); + const raw_spectral_data_dir = "/data"; + + let inferenceFlag = false; + + process.on("message", (msg) => { + console.log("[scanner][主进程消息]: ", msg); + if (msg == "start") { + inferenceFlag = true; + } + if (msg == "stop") { + inferenceFlag = false; } - + // 发送消息给主进程 + // process.send({ message: 'Hello from child' }); + }); + console.log("[scanner][初始化]源数据路径为: ", raw_spectral_data_dir); + + let last_data_file = null; + let latest_data_file = null; + let fd_csv = null; + let fd_bin = null; + while (true) { + if (!inferenceFlag) { + await wait(1000); + continue; + } + + latest_data_file = get_latest_file_path(raw_spectral_data_dir); + + if (latest_data_file != last_data_file) { + console.log("[scanner][数据]数据文件切换为: ", latest_data_file); + + fd_csv = fs.openSync(latest_data_file); + + fd_bin = fs.openSync( + path.format({ + dir: path.dirname(latest_data_file), + name: path.basename(latest_data_file, ".csv"), + ext: "bin", + }) + ); + last_data_file = latest_data_file; + } + + //获取目前的文件末端指针 + let last_pointer = fs.statSync(latest_data_file).size; + + while (true) { + const stat = fs.statSync(latest_data_file); + //不断循环确认有新的一行数据产生,读取相应的数据 + if (stat.size > last_pointer) { + let buffer = Buffer.alloc(stat.size - last_pointer); + fs.readSync( + fd_csv, + buffer, + 0, + stat.size - last_pointer, + last_pointer + ); + let info = buffer.toString().split(","); + let timestamp = Number(info[0]); + let start_pointer = Number(info[1]); + let length = Number(info[2]); + let spectral_buffer = Buffer.alloc(length); + const bytesRead = fs.readSync( + fd_bin, + spectral_buffer, + 0, + length, + start_pointer + ); + //bytesRead读取了多少字节数,全部直接回传给推理服务 + + console.log( + "[scanner][输入]", + formatTimestamp(timestamp), + ", 输入数据字节数: ", + bytesRead + ); + + let upload_data = { spectral_data_bin: spectral_buffer }; + let upload_data_compressed = pako.gzip(pack(upload_data)); + + let response = await fetch("http://sems-model-inference:22111/post", { + method: "post", + body: upload_data_compressed, + }); + let response_data_compressed = await response.arrayBuffer(); + let response_data = unpack(pako.ungzip(response_data_compressed)); + + process.send(response_data); + console.log( + "[scanner][输出]", + formatTimestamp(timestamp), + ", 结果: ", + response_data + ); + + break; + } + } + } + } catch (err) { + console.error(err.message); + } } -main().then(()=>{ - // process.send("child process end"); -}) +main().then(() => { + // process.send("child process end"); +}); diff --git a/server.js b/server.js index e8a0764..189b45a 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,7 @@ import { createServer } from "node:http"; import next from "next"; import { Server } from "socket.io"; -import {fork} from "node:child_process" +import { fork } from "node:child_process"; const dev = process.env.NODE_ENV !== "production"; const hostname = "0.0.0.0"; @@ -14,20 +14,21 @@ app.prepare().then(() => { const httpServer = createServer(handler); const io = new Server(httpServer); - + const forked = fork("./scanner.js"); io.on("connection", (socket) => { - console.log("connected") - socket.emit("msg","connected") + console.log("[server][WebSocket]已连接"); + socket.emit("msg", "connected"); - io.emit("msg","io.emit"); + // io.emit("msg", "io.emit"); + forked.send("start"); + + socket.on("disconnect", () => { + console.log("[server][WebSocket]已断开连接"); + forked.send("stop"); + }); }); - - - - - // socket_global.emit("msg","socket_global") httpServer @@ -36,19 +37,13 @@ app.prepare().then(() => { process.exit(1); }) .listen(port, () => { - console.log(`> Ready on http://${hostname}:${port}`); + console.log(`[server][服务器]已启用于 http://${hostname}:${port}`); }); + forked.on("message", (msg) => { + console.log("[server][子进程]]收到数据", msg); + io.emit("msg", msg); + }); - const forked = fork("./scanner.js") - - forked.on("message", msg => { - console.log("received",msg) - io.emit("msg",msg); - - }) - - forked.send({ hello: "world" }) - - -}); \ No newline at end of file + // +}); diff --git a/src/app/api/healthcheck/route.js b/src/app/api/healthcheck/route.js new file mode 100644 index 0000000..f2d020f --- /dev/null +++ b/src/app/api/healthcheck/route.js @@ -0,0 +1,14 @@ +// app/api/hello/route.js +import { NextRequest, NextResponse } from "next/server"; + +export async function GET(request) { + return NextResponse.json({ message: "I'm OK!" }, { status: 200 }); +} + +export async function POST(request) { + const data = await request.json(); + return NextResponse.json( + { message: "I'm OK!", receivedData: data }, + { status: 200 } + ); +} diff --git a/src/app/api/upload/route.js b/src/app/api/upload/route.js new file mode 100644 index 0000000..5c3b451 --- /dev/null +++ b/src/app/api/upload/route.js @@ -0,0 +1,192 @@ +import fs from "fs"; +import path from "path"; +import readline from "readline"; +import { NextRequest, NextResponse } from "next/server"; + +// // 获取目录下的所有CSV文件 +// function getCsvFiles(dir) { +// // 读取目录中的所有文件,并过滤出扩展名为.csv的文件 +// return fs.readdirSync(dir).filter(file => path.extname(file) === '.csv'); +// } + +// // 读取CSV文件并获取所需数据 +// function readCsvFile(filePath) { +// return new Promise((resolve, reject) => { +// const readStream = fs.createReadStream(filePath); // 创建文件读取流 +// const rl = readline.createInterface({ +// input: readStream, // 将读取流作为输入 +// crlfDelay: Infinity // 处理所有类型的换行符 +// }); + +// let firstRowFirstColumn = null; // 用于存储第一行第一个数据 +// let lastRowFirstColumn = null; // 用于存储最后一行第一个数据 +// let isFirstRow = true; // 标记是否为第一行 + +// // 逐行读取文件 +// rl.on('line', (line) => { +// const columns = line.split(','); // 将每行按逗号分隔成数组 +// if (isFirstRow) { +// firstRowFirstColumn = columns[0]; // 获取第一行第一个数据 +// isFirstRow = false; // 更新标记 +// } +// lastRowFirstColumn = columns[0]; // 更新最后一行第一个数据 +// }); + +// // 文件读取完成时触发 +// rl.on('close', () => { +// resolve({ file: path.basename(filePath), firstRowFirstColumn, lastRowFirstColumn }); // 返回结果 +// }); + +// // 读取过程中发生错误时触发 +// rl.on('error', (error) => reject(error)); // 处理错误 +// }); +// } + +// async function GetMetadata(rawSpectralDataFolder) { +// const csvFiles = getCsvFiles(rawSpectralDataFolder); +// const promises = csvFiles.map((file) => +// readCsvFile(path.join(rawSpectralDataFolder, file)) +// ); +// const results = await Promise.all(promises); +// console.log(results); +// return results; +// } + +const RAW_SPECTRAL_DATA_FOLDER = "/data"; + +//此函数将从RAW_SPECTRAL_DATA_FOLDER文件夹中,获取从startTimestamp到endTimestamp的所有光谱数据 +async function readIntervalSpectralData(startTimestamp, endTimestamp) { + const files = fs + .readdirSync(RAW_SPECTRAL_DATA_FOLDER) + .filter((file) => file.endsWith(".csv")) + .sort((a, b) => a.localeCompare(b)); //获取文件夹下的所有csv文件且按照升序排序 + + let startReadPointer = null; //保存从bin文件中读取光谱数据的开始指针 + let lengthRead = null; //记录读取多少光谱数据 + let flagFindStart = false; //是否找到开始点 + let flagFindEnd = false; //是否找到结束点 + let spectralDataBuffer = null; //保存所有二进制光谱数据 + let spectralDataTimestamps = []; //保存所有二进制光谱数据 + + for (const file of files) { + //扫描所有csv文件 + if (flagFindEnd) { + //如果结束点都找完了,就不再找了 + break; + } + + if (flagFindStart) { + //如果已经找到了开始点,代表是上个文件找到了开始点,但是还没找到结束点,读取了上个文件从开始点到文件末尾的所有数据,所以把指针值为零,从这个文件开头开始读取 + startReadPointer = 0; + lengthRead = 0; + } + + const csvPath = path.join(RAW_SPECTRAL_DATA_FOLDER, file); + const csvStream = fs.createReadStream(csvPath); + const csvReadline = readline.createInterface({ + input: csvStream, + crlfDelay: Infinity, //接受所有换行符 + }); + + let lastLineTimestamp = NaN; //保存上一行的时间戳 + for await (const line of csvReadline) { + //扫描每一行 + const columns = line.split(","); + const timestamp = parseInt(columns[0], 10); + + //如果开始时间戳在这一行与上一行之间,代表这一行是开始点 + if (startTimestamp >= lastLineTimestamp && startTimestamp <= timestamp) { + startReadPointer = parseInt(columns[1], 10); + flagFindStart = true; + } + + //如果结束时间戳在这一行与上一行之间,代表这上一行是结束点,就退出扫描这个文件 + if (endTimestamp >= lastLineTimestamp && endTimestamp <= timestamp) { + flagFindEnd = true; + break; + } + + if (flagFindStart && !flagFindEnd && timestamp <= endTimestamp) { + //已经找到开始,但没找到结束,且当前时间点小于结束时间点,就记录读取这一行对应的光谱 + lengthRead += parseInt(columns[2], 10); + spectralDataTimestamps.push(timestamp); + } + lastLineTimestamp = timestamp; + } + + //如果已经找到了开始点,且length>0, 则读取光谱数据 + if (flagFindStart && lengthRead > 0) { + const binPath = path.join( + RAW_SPECTRAL_DATA_FOLDER, + file.slice(0, -3) + "bin" + ); + const binFd = fs.openSync(binPath, "r"); + const buffer = Buffer.alloc(lengthRead); + const bytesRead = fs.readSync( + binFd, + buffer, + 0, + lengthRead, + startReadPointer + ); + + //为了实现开始到结束点跨越两个文件的功能。 + if (spectralDataBuffer === null) { + spectralDataBuffer = Buffer.alloc(buffer.length); + buffer.copy(spectralDataBuffer); + } else { + spectralDataBuffer = Buffer.concat([spectralDataBuffer, buffer]); + } + console.log( + `[server][api/upload]从${file}文件中读取了${ + buffer.length / 224 / 512 / 2 + }帧光谱数据` + ); + } + } + console.log( + `[server][api/upload]共读取${ + spectralDataBuffer.length / 224 / 512 / 2 + }帧光谱数据,${ + spectralDataTimestamps.length + }个时间戳(按照每帧维度224*512算)` + ); + + if ( + flagFindEnd && + spectralDataBuffer.length / 224 / 512 / 2 == spectralDataTimestamps.length + ) { + return { + spectralDataTimestamps, + spectralDataBuffer, + }; + } else { + return null; + } +} + +export async function GET(request) { + let startTimestamp = new Date("2024-09-26T14:48:00").getTime(); + let endTimestamp = new Date("2024-09-26T14:52:00").getTime(); + + const result = await readIntervalSpectralData(startTimestamp, endTimestamp); + + let response = {}; + if (result == null) { + response = { messgae: "未找到此区间的光谱数据" }; + } else { + response = { + messgae: `找到${result.spectralDataTimestamps.length}帧光谱数据`, + }; + } + + return NextResponse.json(response, { status: 200 }); +} + +export async function POST(request) { + const data = await request.json(); + return NextResponse.json( + { message: "I'm OK!", receivedData: data }, + { status: 200 } + ); +} diff --git a/src/app/upload/page.js b/src/app/upload/page.js new file mode 100644 index 0000000..e126370 --- /dev/null +++ b/src/app/upload/page.js @@ -0,0 +1,226 @@ +"use client"; + +import { useEffect, useState } from "react"; +import { socket } from "../../lib/socket"; + +export default function Upload() { + const [isConnected, setIsConnected] = useState(false); + const [transport, setTransport] = useState("N/A"); + + const [msg, setMsg] = useState("N/A"); + + useEffect(() => { + if (socket.connected) { + onConnect(); + } + + function onConnect() { + setIsConnected(true); + setTransport(socket.io.engine.transport.name); + + console.log("connected"); + + socket.io.engine.on("upgrade", (transport) => { + setTransport(transport.name); + }); + } + + function onDisconnect() { + setIsConnected(false); + setTransport("N/A"); + } + + socket.on("connect", onConnect); + socket.on("disconnect", onDisconnect); + + socket.on("msg", (msg) => { + console.log("Received", msg); + console.log(msg); + setMsg(`Temp:${msg.temp},C:${msg.C}`); + // socket.emit("hello",`Cilent: ${msg}`) + }); + + return () => { + socket.off("connect", onConnect); + socket.off("disconnect", onDisconnect); + }; + }, []); + + const [furnaceNumber, setFurnaceNumber] = useState(98213234); + + const handleFurnaceNumberChange = (event) => { + setFurnaceNumber(event.target.value); + }; + + const getCurrentDateTime = () => { + let now = new Date(); + // 将时间转换为 UTC+8 时区 + // const offset = 8 * 60 * 60 * 1000; // 8 小时的毫秒数 + // now = new Date(now.getTime() + offset); + const year = now.getFullYear(); + const month = String(now.getMonth() + 1).padStart(2, "0"); + const day = String(now.getDate()).padStart(2, "0"); + const hours = String(now.getHours()).padStart(2, "0"); + const minutes = String(now.getMinutes()).padStart(2, "0"); + const seconds = String(now.getSeconds()).padStart(2, "0"); + return `${year}-${month}-${day}T${hours}:${minutes}:${seconds}`; + }; + + const [measureStartDatetime, setMeasureStartDatetime] = useState( + "2024-09-26T14:40:00" + ); + + const [measureEndDatetime, setMeasureEndDatetime] = useState( + "2024-09-26T14:29:00" + ); + + const handleMeasureEndDatetimeChange = (event) => { + setMeasureEndDatetime(event.target.value); + }; + + const handleMeasureStartDatetimeChange = (event) => { + setMeasureStartDatetime(event.target.value); + setMeasureEndDatetime(event.target.value); + }; + + const [elementContent, setElementContent] = useState({ + Temperature: 0, + Mn: 0, + S: 0, + Ni: 0, + Mo: 0, + Cr: 0, + }); + + const elementContentLabels = { + Temperature: "温度(℃)", + Mn: "锰含量(Mn)", + S: "硫含量(S)", + Ni: "镍含量(Ni)", + Mo: "钼含量(Mo)", + Cr: "铬含量(Cr)", + }; + + const handElementContentleChange = (key, event) => { + setElementContent({ + ...elementContent, + [key]: event.target.value, + }); + }; + + const [selecteMeasureType, setSelecteMeasureType] = useState("TSC"); + + const handleSelecteMeasureTypeChange = (event) => { + setSelecteMeasureType(event.target.value); + }; + + const [remark, setRemark] = useState(""); + + const handleRemarkChange = (event) => { + setRemark(event.target.value); + }; + + const [uploadData, setUploadData] = useState({}); + + useEffect(() => { + setUploadData({ + furnaceNumber: furnaceNumber, + measureStartDate: measureStartDatetime, + measureEndDate: measureEndDatetime, + elementContent: elementContent, + selecteMeasureType: selecteMeasureType, + remark: remark, + }); + }, [ + furnaceNumber, + measureStartDatetime, + elementContent, + measureEndDatetime, + selecteMeasureType, + remark, + ]); + + return ( +
+
+

Status: {isConnected ? "connected" : "disconnected"}

+

Transport: {transport}

+

Transport: {msg}

+
+ +
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ {Object.keys(elementContent).map((key) => ( +
+ + handElementContentleChange(key, event)} + step="0.01" + /> +
+ ))} +
+ +
+ + +
+ +
+ + +

待上传数据

+
+          {JSON.stringify(uploadData, null, 2)}
+        
+
+
+ ); +}