package publicway;
|
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
import publicway.ProtocolParser01.ParseResult;
|
|
public class SerialProtocolParser {
|
|
// 协议常量定义
|
private static final byte[] START_MARKER = {(byte) 0xDD, (byte) 0xCC};
|
private static final int MIN_PACKET_LENGTH = 9; // 最小包长度: 起始标记2 + 数据长度2 + 主机地址1 + 卡槽地址1 + 功能码1 + CRC2
|
// 功能码定义
|
private static final byte FUNCTION_01 = 0x01; // 查询数据
|
private static final byte FUNCTION_51 = 0x51; // 开门控制
|
private static final byte FUNCTION_52 = 0x52; // LED亮度控制
|
private static final byte FUNCTION_80 = (byte) 0x80; // 工卡升级使能
|
private static final byte FUNCTION_81 = (byte) 0x81; // 工作卡升级数据包
|
private static final byte FUNCTION_82 = (byte) 0x82; // 单板升级使能
|
private static final byte FUNCTION_83 = (byte) 0x83; // 单板升级数据包
|
|
// 数据缓冲区,用于处理粘包
|
private byte[] dataBuffer = new byte[1024];
|
private int bufferPosition = 0;
|
|
// 数据接收队列
|
private BlockingQueue<byte[]> dataQueue = new ArrayBlockingQueue<>(100);
|
|
// 批量处理队列
|
private final BlockingQueue<byte[]> batchQueue = new ArrayBlockingQueue<>(1000);
|
private final ScheduledExecutorService batchExecutor =
|
Executors.newSingleThreadScheduledExecutor();
|
|
// 线程控制
|
private volatile boolean isRunning = false;
|
private Thread processorThread;
|
|
// 重用StringBuilder减少对象创建
|
private final StringBuilder hexBuilder = new StringBuilder(256);
|
|
/**
|
* 启动解析器
|
*/
|
public void start() {
|
if (isRunning) {
|
System.out.println("串口协议解析器已经在运行中");
|
return;
|
}
|
|
isRunning = true;
|
|
// 启动批量处理(每50ms处理一次)
|
batchExecutor.scheduleAtFixedRate(this::batchProcess, 50, 50, TimeUnit.MILLISECONDS);
|
|
processorThread = new Thread(this::processPackets, "Serial-Protocol-Parser");
|
processorThread.setDaemon(true);
|
processorThread.start();
|
|
System.out.println("串口协议解析器已启动");
|
}
|
|
/**
|
* 停止解析器
|
*/
|
public void stop() {
|
if (!isRunning) {
|
return;
|
}
|
|
isRunning = false;
|
|
// 关闭批量处理器
|
batchExecutor.shutdown();
|
try {
|
if (!batchExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
|
batchExecutor.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
batchExecutor.shutdownNow();
|
Thread.currentThread().interrupt();
|
}
|
|
if (processorThread != null) {
|
processorThread.interrupt();
|
try {
|
processorThread.join(1000);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
}
|
processorThread = null;
|
}
|
|
// 清空队列和缓冲区
|
dataQueue.clear();
|
batchQueue.clear();
|
bufferPosition = 0;
|
|
System.out.println("串口协议解析器已停止");
|
}
|
|
/**
|
* 检查解析器是否在运行
|
*/
|
public boolean isRunning() {
|
return isRunning;
|
}
|
|
/**
|
* 接收串口原始数据 - 使用批量处理
|
*/
|
public void receiveData(byte[] rawData) {
|
if (!isRunning) {
|
System.out.println("警告: 串口协议解析器未启动,忽略接收的数据");
|
return;
|
}
|
|
if (rawData == null || rawData.length == 0) {
|
return;
|
}
|
|
// 将数据添加到批量队列
|
if (!batchQueue.offer(rawData)) {
|
System.err.println("批量队列已满,丢弃数据");
|
}
|
}
|
|
/**
|
* 批量处理数据
|
*/
|
private void batchProcess() {
|
if (batchQueue.isEmpty()) {
|
return;
|
}
|
|
// 批量处理数据
|
java.util.List<byte[]> batch = new java.util.ArrayList<>(100);
|
batchQueue.drainTo(batch, 100);
|
|
for (byte[] rawData : batch) {
|
// 将数据添加到缓冲区
|
if (bufferPosition + rawData.length > dataBuffer.length) {
|
// 缓冲区不足时,清理并重新开始
|
System.arraycopy(dataBuffer, bufferPosition - rawData.length, dataBuffer, 0, rawData.length);
|
bufferPosition = rawData.length;
|
} else {
|
System.arraycopy(rawData, 0, dataBuffer, bufferPosition, rawData.length);
|
bufferPosition += rawData.length;
|
}
|
|
// 处理缓冲区中的数据
|
processBuffer();
|
}
|
|
// 定期检查内存
|
checkMemory();
|
}
|
|
/**
|
* 内存监控
|
*/
|
private void checkMemory() {
|
Runtime runtime = Runtime.getRuntime();
|
long usedMem = runtime.totalMemory() - runtime.freeMemory();
|
long maxMem = runtime.maxMemory();
|
|
if (usedMem > maxMem * 0.8) {
|
System.out.println("内存使用率超过80%,当前使用: " + (usedMem / 1024 / 1024) + "MB");
|
}
|
}
|
|
/**
|
* 处理缓冲区中的数据,解析完整数据包
|
*/
|
private void processBuffer() {
|
while (bufferPosition >= MIN_PACKET_LENGTH) {
|
// 查找起始标记
|
int startIndex = findStartMarker();
|
if (startIndex == -1) {
|
// 没有找到起始标记,清空无效数据
|
bufferPosition = 0;
|
return;
|
}
|
|
// 检查是否有足够的数据读取数据长度
|
if (startIndex + 4 > bufferPosition) {
|
// 数据不足,等待更多数据
|
compactBuffer(startIndex);
|
return;
|
}
|
|
// 读取数据长度 (大端序)
|
int dataLength = ((dataBuffer[startIndex + 2] & 0xFF) << 8) | (dataBuffer[startIndex + 3] & 0xFF);
|
int totalPacketLength = 2 + 2 + dataLength + 2; // 起始标记2 + 数据长度2 + 数据内容 + CRC2
|
|
// 检查是否收到完整数据包
|
if (startIndex + totalPacketLength > bufferPosition) {
|
// 数据包不完整,等待更多数据
|
compactBuffer(startIndex);
|
return;
|
}
|
|
// 提取完整数据包
|
byte[] packet = new byte[totalPacketLength];
|
System.arraycopy(dataBuffer, startIndex, packet, 0, totalPacketLength);
|
|
// 将数据包放入队列供解析
|
try {
|
if (!dataQueue.offer(packet)) {
|
System.err.println("数据队列已满,丢弃数据包");
|
}
|
} catch (Exception e) {
|
System.err.println("放入数据队列时发生异常: " + e.getMessage());
|
}
|
|
// 移动缓冲区位置
|
int remaining = bufferPosition - (startIndex + totalPacketLength);
|
if (remaining > 0) {
|
System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining);
|
}
|
bufferPosition = remaining;
|
}
|
}
|
|
/**
|
* 查找起始标记位置
|
*/
|
private int findStartMarker() {
|
for (int i = 0; i <= bufferPosition - START_MARKER.length; i++) {
|
if (dataBuffer[i] == START_MARKER[0] && dataBuffer[i + 1] == START_MARKER[1]) {
|
return i;
|
}
|
}
|
return -1;
|
}
|
|
/**
|
* 压缩缓冲区,将有效数据移到开头
|
*/
|
private void compactBuffer(int startIndex) {
|
if (startIndex > 0) {
|
System.arraycopy(dataBuffer, startIndex, dataBuffer, 0, bufferPosition - startIndex);
|
bufferPosition -= startIndex;
|
}
|
}
|
|
/**
|
* 处理数据包的主方法
|
*/
|
private void processPackets() {
|
System.out.println("串口数据包处理线程开始运行");
|
|
while (isRunning && !Thread.currentThread().isInterrupted()) {
|
try {
|
byte[] packet = dataQueue.take(); // 阻塞直到有数据
|
parsePacket(packet);
|
} catch (InterruptedException e) {
|
System.out.println("串口数据包处理线程被中断");
|
Thread.currentThread().interrupt();
|
break;
|
} catch (Exception e) {
|
System.err.println("处理数据包时发生异常: " + e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
System.out.println("串口数据包处理线程结束运行");
|
}
|
|
/**
|
* 解析数据包并根据功能码调用相应方法
|
*/
|
private void parsePacket(byte[] packet) {
|
try {
|
// 解析基本字段
|
byte hostAddress = packet[4]; // 主机地址
|
byte slotAddress = packet[5]; // 卡槽地址
|
byte functionCode = packet[6]; // 功能码
|
|
// 数据长度 (从协议中读取)
|
int dataLength = ((packet[2] & 0xFF) << 8) | (packet[3] & 0xFF);
|
|
// 返回值数据
|
int returnValueLength = dataLength - 3; // N-3 (减去主机地址、卡槽地址、功能码)
|
byte[] returnValue = null;
|
if (returnValueLength > 0) {
|
returnValue = new byte[returnValueLength];
|
System.arraycopy(packet, 7, returnValue, 0, returnValueLength);
|
}
|
|
// 根据功能码调用不同的解析方法
|
switch (functionCode) {
|
case FUNCTION_01:
|
if (returnValue != null) {
|
// 使用优化的字节数组解析方法,避免字符串转换
|
ParseResult rst = ProtocolParser01.parseDDCC01Data(packet);
|
rst.fuzhi();
|
rst.toString();
|
}
|
break;
|
case FUNCTION_51:
|
// 调用 ProtocolParser51 处理数据
|
String hexPacket = bytesToHex(packet);
|
int result = ProtocolParser51.parse(hexPacket);
|
if (result == 1) {
|
System.out.println("功能码 0x51 - 开门控制成功");
|
} else {
|
System.out.println("功能码 0x51 - 开门控制失败或报文不合法");
|
}
|
break;
|
case FUNCTION_52:
|
System.out.println("功能码 0x52 - LED亮度控制");
|
break;
|
case FUNCTION_80:
|
System.out.println("功能码 0x80 - 工卡升级使能");
|
break;
|
case FUNCTION_81:
|
System.out.println("功能码 0x81 - 工作卡升级数据包");
|
break;
|
case FUNCTION_82:
|
System.out.println("功能码 0x82 - 单板升级使能");
|
break;
|
case FUNCTION_83:
|
System.out.println("功能码 0x83 - 单板升级数据包");
|
break;
|
default:
|
System.err.println("未知功能码: 0x" + Integer.toHexString(functionCode & 0xFF));
|
break;
|
}
|
|
} catch (Exception e) {
|
System.err.println("解析数据包时发生错误: " + e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 优化的字节数组转十六进制字符串方法
|
*/
|
private String bytesToHex(byte[] bytes) {
|
hexBuilder.setLength(0); // 清空重用
|
for (byte b : bytes) {
|
hexBuilder.append(String.format("%02X", b));
|
}
|
return hexBuilder.toString();
|
}
|
|
public static byte[] hexStringToBytes(String hexString) {
|
if (hexString == null || hexString.trim().isEmpty()) {
|
return new byte[0];
|
}
|
|
// 移除所有空格
|
String cleanedHex = hexString.replaceAll("\\s", "");
|
|
// 检查长度是否为偶数
|
if (cleanedHex.length() % 2 != 0) {
|
throw new IllegalArgumentException("HEX字符串长度必须为偶数: " + cleanedHex);
|
}
|
|
byte[] result = new byte[cleanedHex.length() / 2];
|
|
for (int i = 0; i < cleanedHex.length(); i += 2) {
|
String byteStr = cleanedHex.substring(i, i + 2);
|
try {
|
result[i / 2] = (byte) Integer.parseInt(byteStr, 16);
|
} catch (NumberFormatException e) {
|
throw new IllegalArgumentException("无效的HEX字符: " + byteStr, e);
|
}
|
}
|
|
return result;
|
}
|
|
/**
|
* 获取解析器状态信息
|
*/
|
public String getStatusInfo() {
|
return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d",
|
isRunning ? "运行中" : "已停止",
|
dataQueue.size(),
|
dataQueue.remainingCapacity() + dataQueue.size(),
|
batchQueue.size(),
|
batchQueue.remainingCapacity() + batchQueue.size());
|
}
|
|
/**
|
* 设置原始数据最大打印长度
|
*/
|
public void setMaxRawDataPrintLength(int length) {
|
// 实现根据需要调整
|
}
|
}
|