您好,登錄后才能下訂單哦!
本篇文章將介紹JT808協議的解析思路。
另請大神繞路,不喜勿噴!
先寫個大致的思路,有疑問可以聯系本人,聯系方式:
emial: hylexus@163.com
數據類型 | 描述及要求 |
---|---|
BYTE | 無符號單字節整形(字節, 8 位) |
WORD | 無符號雙字節整形(字, 16 位) |
DWORD | 無符號四字節整形(雙字, 32 位) |
BYTE[n] | n 字節 |
BCD[n] | 8421 碼, n 字節 |
STRING | GBK 編碼,若無數據,置空 |
標識位 | 消息頭 | 消息體 | 校驗碼 | 標識位 |
---|---|---|---|---|
1byte(0x7e) | 16byte | 1byte | 1byte(0x7e) |
消息ID(0-1) 消息體屬性(2-3) 終端手機號(4-9) 消息流水號(10-11) 消息包封裝項(12-15)
byte[0-1] 消息ID word(16)
byte[2-3] 消息體屬性 word(16)
bit[0-9] 消息體長度
bit[10-12] 數據加密方式
此三位都為 0,表示消息體不加密
第 10 位為 1,表示消息體經過 RSA 算法加密
其它保留
bit[13] 分包
1:消息體衛長消息,進行分包發送處理,具體分包信息由消息包封裝項決定
0:則消息頭中無消息包封裝項字段
bit[14-15] 保留
byte[4-9] 終端手機號或設備ID bcd[6]
根據安裝后終端自身的手機號轉換
手機號不足12 位,則在前面補 0
byte[10-11] 消息流水號 word(16)
按發送順序從 0 開始循環累加
byte[12-15] 消息包封裝項
byte[0-1] 消息包總數(word(16))
該消息分包后得總包數
byte[2-3] 包序號(word(16))
從 1 開始
如果消息體屬性中相關標識位確定消息分包處理,則該項有內容
否則無該項
整個消息體結構中最復雜的就是消息頭了。
以下是對整個消息體抽象出來的一個java實體類。
import java.nio.channels.Channel; public class PackageData { /** * 16byte 消息頭 */ protected MsgHeader msgHeader; // 消息體字節數組 protected byte[] msgBodyBytes; /** * 校驗碼 1byte */ protected int checkSum; //記錄每個客戶端的channel,以便下發信息給客戶端 protected Channel channel; public MsgHeader getMsgHeader() { return msgHeader; } //TODO set 和 get 方法在此處省略 //消息頭 public static class MsgHeader { // 消息ID protected int msgId; /////// ========消息體屬性 // byte[2-3] protected int msgBodyPropsField; // 消息體長度 protected int msgBodyLength; // 數據加密方式 protected int encryptionType; // 是否分包,true==>有消息包封裝項 protected boolean hasSubPackage; // 保留位[14-15] protected String reservedBit; /////// ========消息體屬性 // 終端手機號 protected String terminalPhone; // 流水號 protected int flowId; //////// =====消息包封裝項 // byte[12-15] protected int packageInfoField; // 消息包總數(word(16)) protected long totalSubPackage; // 包序號(word(16))這次發送的這個消息包是分包中的第幾個消息包, 從 1 開始 protected long subPackageSeq; //////// =====消息包封裝項 //TODO set 和 get 方法在此處省略 } }
2.2.1 消息轉換器
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hylexus.jt808.util.BCD8421Operater; import cn.hylexus.jt808.util.BitOperator; import cn.hylexus.jt808.vo.PackageData; import cn.hylexus.jt808.vo.PackageData.MsgHeader; public class MsgDecoder { private static final Logger log = LoggerFactory.getLogger(MsgDecoder.class); private BitOperator bitOperator; private BCD8421Operater bcd8421Operater; public MsgDecoder() { this.bitOperator = new BitOperator(); this.bcd8421Operater = new BCD8421Operater(); } //字節數組到消息體實體類 public PackageData queueElement2PackageData(byte[] data) { PackageData ret = new PackageData(); // 1. 16byte 或 12byte 消息頭 MsgHeader msgHeader = this.parseMsgHeaderFromBytes(data); ret.setMsgHeader(msgHeader); int msgBodyByteStartIndex = 12; // 2. 消息體 // 有子包信息,消息體起始字節后移四個字節:消息包總數(word(16))+包序號(word(16)) if (msgHeader.isHasSubPackage()) { msgBodyByteStartIndex = 16; } byte[] tmp = new byte[msgHeader.getMsgBodyLength()]; System.arraycopy(data, msgBodyByteStartIndex, tmp, 0, tmp.length); ret.setMsgBodyBytes(tmp); // 3. 去掉分隔符之后,最后一位就是校驗碼 // int checkSumInPkg = // this.bitOperator.oneByteToInteger(data[data.length - 1]); int checkSumInPkg = data[data.length - 1]; int calculatedCheckSum = this.bitOperator.getCheckSum4JT808(data, 0, data.length - 1); ret.setCheckSum(checkSumInPkg); if (checkSumInPkg != calculatedCheckSum) { log.warn("檢驗碼不一致,msgid:{},pkg:{},calculated:{}", msgHeader.getMsgId(), checkSumInPkg, calculatedCheckSum); } return ret; } private MsgHeader parseMsgHeaderFromBytes(byte[] data) { MsgHeader msgHeader = new MsgHeader(); // 1. 消息ID word(16) // byte[] tmp = new byte[2]; // System.arraycopy(data, 0, tmp, 0, 2); // msgHeader.setMsgId(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setMsgId(this.parseIntFromBytes(data, 0, 2)); // 2. 消息體屬性 word(16)=================> // System.arraycopy(data, 2, tmp, 0, 2); // int msgBodyProps = this.bitOperator.twoBytesToInteger(tmp); int msgBodyProps = this.parseIntFromBytes(data, 2, 2); msgHeader.setMsgBodyPropsField(msgBodyProps); // [ 0-9 ] 0000,0011,1111,1111(3FF)(消息體長度) msgHeader.setMsgBodyLength(msgBodyProps & 0x1ff); // [10-12] 0001,1100,0000,0000(1C00)(加密類型) msgHeader.setEncryptionType((msgBodyProps & 0xe00) >> 10); // [ 13_ ] 0010,0000,0000,0000(2000)(是否有子包) msgHeader.setHasSubPackage(((msgBodyProps & 0x2000) >> 13) == 1); // [14-15] 1100,0000,0000,0000(C000)(保留位) msgHeader.setReservedBit(((msgBodyProps & 0xc000) >> 14) + ""); // 消息體屬性 word(16)<================= // 3. 終端手機號 bcd[6] // tmp = new byte[6]; // System.arraycopy(data, 4, tmp, 0, 6); // msgHeader.setTerminalPhone(this.bcd8421Operater.bcd2String(tmp)); msgHeader.setTerminalPhone(this.parseBcdStringFromBytes(data, 4, 6)); // 4. 消息流水號 word(16) 按發送順序從 0 開始循環累加 // tmp = new byte[2]; // System.arraycopy(data, 10, tmp, 0, 2); // msgHeader.setFlowId(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setFlowId(this.parseIntFromBytes(data, 10, 2)); // 5. 消息包封裝項 // 有子包信息 if (msgHeader.isHasSubPackage()) { // 消息包封裝項字段 msgHeader.setPackageInfoField(this.parseIntFromBytes(data, 12, 4)); // byte[0-1] 消息包總數(word(16)) // tmp = new byte[2]; // System.arraycopy(data, 12, tmp, 0, 2); // msgHeader.setTotalSubPackage(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setTotalSubPackage(this.parseIntFromBytes(data, 12, 2)); // byte[2-3] 包序號(word(16)) 從 1 開始 // tmp = new byte[2]; // System.arraycopy(data, 14, tmp, 0, 2); // msgHeader.setSubPackageSeq(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setSubPackageSeq(this.parseIntFromBytes(data, 12, 2)); } return msgHeader; } protected String parseStringFromBytes(byte[] data, int startIndex, int lenth) { return this.parseStringFromBytes(data, startIndex, lenth, null); } private String parseStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) { try { byte[] tmp = new byte[lenth]; System.arraycopy(data, startIndex, tmp, 0, lenth); return new String(tmp, "UTF-8"); } catch (Exception e) { log.error("解析字符串出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth) { return this.parseBcdStringFromBytes(data, startIndex, lenth, null); } private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) { try { byte[] tmp = new byte[lenth]; System.arraycopy(data, startIndex, tmp, 0, lenth); return this.bcd8421Operater.bcd2String(tmp); } catch (Exception e) { log.error("解析BCD(8421碼)出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } private int parseIntFromBytes(byte[] data, int startIndex, int length) { return this.parseIntFromBytes(data, startIndex, length, 0); } private int parseIntFromBytes(byte[] data, int startIndex, int length, int defaultVal) { try { // 字節數大于4,從起始索引開始向后處理4個字節,其余超出部分丟棄 final int len = length > 4 ? 4 : length; byte[] tmp = new byte[len]; System.arraycopy(data, startIndex, tmp, 0, len); return bitOperator.byteToInteger(tmp); } catch (Exception e) { log.error("解析整數出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } }
2.2.2 用到的工具類
2.2.2.1 BCD操作工具類
package cn.hylexus.jt808.util; public class BCD8421Operater { /** * BCD字節數組===>String * * @param bytes * @return 十進制字符串 */ public String bcd2String(byte[] bytes) { StringBuilder temp = new StringBuilder(bytes.length * 2); for (int i = 0; i < bytes.length; i++) { // 高四位 temp.append((bytes[i] & 0xf0) >>> 4); // 低四位 temp.append(bytes[i] & 0x0f); } return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp.toString().substring(1) : temp.toString(); } /** * 字符串==>BCD字節數組 * * @param str * @return BCD字節數組 */ public byte[] string2Bcd(String str) { // 奇數,前補零 if ((str.length() & 0x1) == 1) { str = "0" + str; } byte ret[] = new byte[str.length() / 2]; byte bs[] = str.getBytes(); for (int i = 0; i < ret.length; i++) { byte high = ascII2Bcd(bs[2 * i]); byte low = ascII2Bcd(bs[2 * i + 1]); // TODO 只遮罩BCD低四位? ret[i] = (byte) ((high << 4) | low); } return ret; } private byte ascII2Bcd(byte asc) { if ((asc >= '0') && (asc <= '9')) return (byte) (asc - '0'); else if ((asc >= 'A') && (asc <= 'F')) return (byte) (asc - 'A' + 10); else if ((asc >= 'a') && (asc <= 'f')) return (byte) (asc - 'a' + 10); else return (byte) (asc - 48); } }
2.2.2.2 位操作工具類
package cn.hylexus.jt808.util; import java.util.Arrays; import java.util.List; public class BitOperator { /** * 把一個整形該為byte * * @param value * @return * @throws Exception */ public byte integerTo1Byte(int value) { return (byte) (value & 0xFF); } /** * 把一個整形該為1位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo1Bytes(int value) { byte[] result = new byte[1]; result[0] = (byte) (value & 0xFF); return result; } /** * 把一個整形改為2位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo2Bytes(int value) { byte[] result = new byte[2]; result[0] = (byte) ((value >>> 8) & 0xFF); result[1] = (byte) (value & 0xFF); return result; } /** * 把一個整形改為3位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo3Bytes(int value) { byte[] result = new byte[3]; result[0] = (byte) ((value >>> 16) & 0xFF); result[1] = (byte) ((value >>> 8) & 0xFF); result[2] = (byte) (value & 0xFF); return result; } /** * 把一個整形改為4位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo4Bytes(int value){ byte[] result = new byte[4]; result[0] = (byte) ((value >>> 24) & 0xFF); result[1] = (byte) ((value >>> 16) & 0xFF); result[2] = (byte) ((value >>> 8) & 0xFF); result[3] = (byte) (value & 0xFF); return result; } /** * 把byte[]轉化位整形,通常為指令用 * * @param value * @return * @throws Exception */ public int byteToInteger(byte[] value) { int result; if (value.length == 1) { result = oneByteToInteger(value[0]); } else if (value.length == 2) { result = twoBytesToInteger(value); } else if (value.length == 3) { result = threeBytesToInteger(value); } else if (value.length == 4) { result = fourBytesToInteger(value); } else { result = fourBytesToInteger(value); } return result; } /** * 把一個byte轉化位整形,通常為指令用 * * @param value * @return * @throws Exception */ public int oneByteToInteger(byte value) { return (int) value & 0xFF; } /** * 把一個2位的數組轉化位整形 * * @param value * @return * @throws Exception */ public int twoBytesToInteger(byte[] value) { // if (value.length < 2) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; return ((temp0 << 8) + temp1); } /** * 把一個3位的數組轉化位整形 * * @param value * @return * @throws Exception */ public int threeBytesToInteger(byte[] value) { int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; return ((temp0 << 16) + (temp1 << 8) + temp2); } /** * 把一個4位的數組轉化位整形,通常為指令用 * * @param value * @return * @throws Exception */ public int fourBytesToInteger(byte[] value) { // if (value.length < 4) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; int temp3 = value[3] & 0xFF; return ((temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3); } /** * 把一個4位的數組轉化位整形 * * @param value * @return * @throws Exception */ public long fourBytesToLong(byte[] value) throws Exception { // if (value.length < 4) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; int temp3 = value[3] & 0xFF; return (((long) temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3); } /** * 把一個數組轉化長整形 * * @param value * @return * @throws Exception */ public long bytes2Long(byte[] value) { long result = 0; int len = value.length; int temp; for (int i = 0; i < len; i++) { temp = (len - 1 - i) * 8; if (temp == 0) { result += (value[i] & 0x0ff); } else { result += (value[i] & 0x0ff) << temp; } } return result; } /** * 把一個長整形改為byte數組 * * @param value * @return * @throws Exception */ public byte[] longToBytes(long value){ return longToBytes(value, 8); } /** * 把一個長整形改為byte數組 * * @param value * @return * @throws Exception */ public byte[] longToBytes(long value, int len) { byte[] result = new byte[len]; int temp; for (int i = 0; i < len; i++) { temp = (len - 1 - i) * 8; if (temp == 0) { result[i] += (value & 0x0ff); } else { result[i] += (value >>> temp) & 0x0ff; } } return result; } /** * 得到一個消息ID * * @return * @throws Exception */ public byte[] generateTransactionID() throws Exception { byte[] id = new byte[16]; System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 0, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 2, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 4, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 6, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 8, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 10, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 12, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 14, 2); return id; } /** * 把IP拆分位int數組 * * @param ip * @return * @throws Exception */ public int[] getIntIPValue(String ip) throws Exception { String[] sip = ip.split("[.]"); // if (sip.length != 4) { // throw new Exception("error IPAddress"); // } int[] intIP = { Integer.parseInt(sip[0]), Integer.parseInt(sip[1]), Integer.parseInt(sip[2]), Integer.parseInt(sip[3]) }; return intIP; } /** * 把byte類型IP地址轉化位字符串 * * @param address * @return * @throws Exception */ public String getStringIPValue(byte[] address) throws Exception { int first = this.oneByteToInteger(address[0]); int second = this.oneByteToInteger(address[1]); int third = this.oneByteToInteger(address[2]); int fourth = this.oneByteToInteger(address[3]); return first + "." + second + "." + third + "." + fourth; } /** * 合并字節數組 * * @param first * @param rest * @return */ public byte[] concatAll(byte[] first, byte[]... rest) { int totalLength = first.length; for (byte[] array : rest) { if (array != null) { totalLength += array.length; } } byte[] result = Arrays.copyOf(first, totalLength); int offset = first.length; for (byte[] array : rest) { if (array != null) { System.arraycopy(array, 0, result, offset, array.length); offset += array.length; } } return result; } /** * 合并字節數組 * * @param rest * @return */ public byte[] concatAll(List<byte[]> rest) { int totalLength = 0; for (byte[] array : rest) { if (array != null) { totalLength += array.length; } } byte[] result = new byte[totalLength]; int offset = 0; for (byte[] array : rest) { if (array != null) { System.arraycopy(array, 0, result, offset, array.length); offset += array.length; } } return result; } public float byte2Float(byte[] bs) { return Float.intBitsToFloat( (((bs[3] & 0xFF) << 24) + ((bs[2] & 0xFF) << 16) + ((bs[1] & 0xFF) << 8) + (bs[0] & 0xFF))); } public float byteBE2Float(byte[] bytes) { int l; l = bytes[0]; l &= 0xff; l |= ((long) bytes[1] << 8); l &= 0xffff; l |= ((long) bytes[2] << 16); l &= 0xffffff; l |= ((long) bytes[3] << 24); return Float.intBitsToFloat(l); } public int getCheckSum4JT808(byte[] bs, int start, int end) { if (start < 0 || end > bs.length) throw new ArrayIndexOutOfBoundsException("getCheckSum4JT808 error : index out of bounds(start=" + start + ",end=" + end + ",bytes length=" + bs.length + ")"); int cs = 0; for (int i = start; i < end; i++) { cs ^= bs[i]; } return cs; } public int getBitRange(int number, int start, int end) { if (start < 0) throw new IndexOutOfBoundsException("min index is 0,but start = " + start); if (end >= Integer.SIZE) throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but end = " + end); return (number << Integer.SIZE - (end + 1)) >>> Integer.SIZE - (end - start + 1); } public int getBitAt(int number, int index) { if (index < 0) throw new IndexOutOfBoundsException("min index is 0,but " + index); if (index >= Integer.SIZE) throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but " + index); return ((1 << index) & number) >> index; } public int getBitAtS(int number, int index) { String s = Integer.toBinaryString(number); return Integer.parseInt(s.charAt(index) + ""); } @Deprecated public int getBitRangeS(int number, int start, int end) { String s = Integer.toBinaryString(number); StringBuilder sb = new StringBuilder(s); while (sb.length() < Integer.SIZE) { sb.insert(0, "0"); } String tmp = sb.reverse().substring(start, end + 1); sb = new StringBuilder(tmp); return Integer.parseInt(sb.reverse().toString(), 2); } }
2.3.1 netty處理器鏈
import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.kkbc.tpms.tcp.service.TCPServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; public class TCPServer2 { private Logger log = LoggerFactory.getLogger(getClass()); private volatile boolean isRunning = false; private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private int port; public TCPServer2() { } public TCPServer2(int port) { this(); this.port = port; } private void bind() throws Exception { this.bossGroup = new NioEventLoopGroup(); this.workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup)// .channel(NioServerSocketChannel.class) // .childHandler(new ChannelInitializer<SocketChannel>() { // @Override public void initChannel(SocketChannel ch) throws Exception { //超過15分鐘未收到客戶端消息則自動斷開客戶端連接 ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //ch.pipeline().addLast(new Decoder4LoggingOnly()); // 1024表示單條消息的最大長度,解碼器在查找分隔符的時候,達到該長度還沒找到的話會拋異常 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }), Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e }))); ch.pipeline().addLast(new TCPServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) // .childOption(ChannelOption.SO_KEEPALIVE, true); this.log.info("TCP服務啟動完畢,port={}", this.port); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } public synchronized void startServer() { if (this.isRunning) { throw new IllegalStateException(this.getName() + " is already started ."); } this.isRunning = true; new Thread(() -> { try { this.bind(); } catch (Exception e) { this.log.info("TCP服務啟動出錯:{}", e.getMessage()); e.printStackTrace(); } }, this.getName()).start(); } public synchronized void stopServer() { if (!this.isRunning) { throw new IllegalStateException(this.getName() + " is not yet started ."); } this.isRunning = false; try { Future<?> future = this.workerGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("workerGroup 無法正常停止:{}", future.cause()); } future = this.bossGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("bossGroup 無法正常停止:{}", future.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } this.log.info("TCP服務已經停止..."); } private String getName() { return "TCP-Server"; } public static void main(String[] args) throws Exception { TCPServer2 server = new TCPServer2(20048); server.startServer(); // Thread.sleep(3000); // server.stopServer(); } }
2.3.2 netty針對于JT808的消息處理器
package cn.hylexus.jt808.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hylexus.jt808.server.SessionManager; import cn.hylexus.jt808.service.codec.MsgDecoder; import cn.hylexus.jt808.vo.PackageData; import cn.hylexus.jt808.vo.Session; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1) private final Logger logger = LoggerFactory.getLogger(getClass()); // 一個維護客戶端連接的類 private final SessionManager sessionManager; private MsgDecoder decoder = new MsgDecoder(); public TCPServerHandler() { this.sessionManager = SessionManager.getInstance(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2) try { ByteBuf buf = (ByteBuf) msg; if (buf.readableBytes() <= 0) { // ReferenceCountUtil.safeRelease(msg); return; } byte[] bs = new byte[buf.readableBytes()]; buf.readBytes(bs); PackageData jt808Msg = this.decoder.queueElement2PackageData(bs); // 處理客戶端消息 this.processClientMsg(jt808Msg); } finally { release(msg); } } private void processClientMsg(PackageData jt808Msg) { // TODO 更加消息ID的不同,分別實現自己的業務邏輯 if (jt808Msg.getMsgHeader().getMsgId() == 0x900) { // TODO ... } else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) { // TODO ... } // else if(){} // else if(){} // else if(){} // else if(){} // ... else { logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) logger.error("發生異常:{}", cause.getMessage()); cause.printStackTrace(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Session session = Session.buildSession(ctx.channel()); sessionManager.put(session.getId(), session); logger.debug("終端連接:{}", session); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String sessionId = ctx.channel().id().asLongText(); Session session = sessionManager.findBySessionId(sessionId); this.sessionManager.removeBySessionId(sessionId); logger.debug("終端斷開連接:{}", session); ctx.channel().close(); // ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel())); logger.error("服務器主動斷開連接:{}", session); ctx.close(); } } } private void release(Object msg) { try { ReferenceCountUtil.release(msg); } catch (Exception e) { e.printStackTrace(); } } }
2.3.3 用到的其他類
package cn.hylexus.jt808.server; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.stream.Collectors; import cn.hylexus.jt808.vo.Session; public class SessionManager { private static volatile SessionManager instance = null; // netty生成的sessionID和Session的對應關系 private Map<String, Session> sessionIdMap; // 終端手機號和netty生成的sessionID的對應關系 private Map<String, String> phoneMap; public static SessionManager getInstance() { if (instance == null) { synchronized (SessionManager.class) { if (instance == null) { instance = new SessionManager(); } } } return instance; } public SessionManager() { this.sessionIdMap = new ConcurrentHashMap<>(); this.phoneMap = new ConcurrentHashMap<>(); } public boolean containsKey(String sessionId) { return sessionIdMap.containsKey(sessionId); } public boolean containsSession(Session session) { return sessionIdMap.containsValue(session); } public Session findBySessionId(String id) { return sessionIdMap.get(id); } public Session findByTerminalPhone(String phone) { String sessionId = this.phoneMap.get(phone); if (sessionId == null) return null; return this.findBySessionId(sessionId); } public synchronized Session put(String key, Session value) { if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) { this.phoneMap.put(value.getTerminalPhone(), value.getId()); } return sessionIdMap.put(key, value); } public synchronized Session removeBySessionId(String sessionId) { if (sessionId == null) return null; Session session = sessionIdMap.remove(sessionId); if (session == null) return null; if (session.getTerminalPhone() != null) this.phoneMap.remove(session.getTerminalPhone()); return session; } public Set<String> keySet() { return sessionIdMap.keySet(); } public void forEach(BiConsumer<? super String, ? super Session> action) { sessionIdMap.forEach(action); } public Set<Entry<String, Session>> entrySet() { return sessionIdMap.entrySet(); } public List<Session> toList() { return this.sessionIdMap.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList()); } }
請移步: https://github.com/hylexus/jt-808-protocol
另請不要吝嗇,在GitHub給個star讓小可裝裝逼…………^_^
注
急急忙忙寫的博客,先寫個大致的思路,有疑問可以聯系本人,聯系方式:
emial: hylexus@163.com
到此這篇關于java解析JT808協議的實現代碼的文章就介紹到這了,更多相關java解析JT808內容請搜索億速云以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持億速云!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。