146 lines
6.8 KiB
Java
146 lines
6.8 KiB
Java
package com.sipai.netty;
|
||
|
||
import com.alibaba.fastjson.JSONObject;
|
||
import com.sipai.entity.scada.MPoint;
|
||
import com.sipai.entity.scada.MPointHistory;
|
||
import com.sipai.entity.work.ElectricityMeter;
|
||
import com.sipai.service.scada.MPointHistoryService;
|
||
import com.sipai.service.scada.MPointService;
|
||
import com.sipai.service.work.ElectricityMeterService;
|
||
import com.sipai.tools.CommUtil;
|
||
import com.sipai.tools.SpringContextUtil;
|
||
import io.netty.buffer.ByteBuf;
|
||
import io.netty.buffer.Unpooled;
|
||
import io.netty.channel.ChannelHandlerContext;
|
||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||
import io.netty.util.CharsetUtil;
|
||
|
||
import java.io.IOException;
|
||
import java.math.BigDecimal;
|
||
import java.util.Map.Entry;
|
||
|
||
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
||
|
||
@Override
|
||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||
ByteBuf buf = (ByteBuf) msg;
|
||
//String jsonStr = JSONObject.toJSONString(buf);
|
||
System.out.println("Netty服务端接收信息(" +CommUtil.nowDate()+"):"+ buf.toString(CharsetUtil.UTF_8));
|
||
JSONObject jsonObject = null;
|
||
try {
|
||
//当数据不是json格式时,捕获异常,设为空
|
||
jsonObject = JSONObject.parseObject(buf.toString(CharsetUtil.UTF_8));
|
||
} catch (Exception e) {
|
||
jsonObject = null;
|
||
}
|
||
if(null != jsonObject){
|
||
ElectricityMeterService electricityMeterService = (ElectricityMeterService) SpringContextUtil.getBean("electricityMeterService");
|
||
//获取电表ID
|
||
String ID = (String) jsonObject.get("ID");
|
||
ElectricityMeter electricityMeter = electricityMeterService.selectById(ID);
|
||
if(electricityMeter!=null){
|
||
String bizId = electricityMeter.getBizid();
|
||
if(null != bizId && !"".equals(bizId)){
|
||
MPoint mPointEntity = null;
|
||
MPointHistory mPointHistory = null;
|
||
BigDecimal parmvalue = new BigDecimal("0");
|
||
String measuredt = "";
|
||
int num = 0;
|
||
int numHistory = 0;
|
||
MPointService mPointService = (MPointService) SpringContextUtil.getBean("mPointService");
|
||
MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService");
|
||
String prefix = "";
|
||
String suffix = "";
|
||
if(electricityMeter.getPrefix()!=null && !"".equals(electricityMeter.getPrefix())){
|
||
prefix = electricityMeter.getPrefix()+"_";
|
||
}
|
||
if(electricityMeter.getSuffix()!=null && !"".equals(electricityMeter.getSuffix())){
|
||
suffix = "_"+electricityMeter.getSuffix();
|
||
}
|
||
for (Entry<String, Object> entry : jsonObject.entrySet()) {
|
||
if(!"ID".equals(entry.getKey())){
|
||
if(isNumber(entry.getValue().toString())){
|
||
//点位: 前缀_key_后缀
|
||
mPointEntity = mPointService.selectById(bizId, prefix+entry.getKey()+suffix);
|
||
if(mPointEntity!=null){
|
||
parmvalue = new BigDecimal(entry.getValue().toString());
|
||
measuredt = CommUtil.nowDate();
|
||
mPointEntity.setParmvalue(parmvalue);
|
||
mPointEntity.setMeasuredt(measuredt);
|
||
int res = mPointService.update(bizId, mPointEntity);
|
||
if(res>0){
|
||
num++;
|
||
mPointHistory=new MPointHistory();
|
||
mPointHistory.setParmvalue(parmvalue);
|
||
mPointHistory.setMeasuredt(measuredt);
|
||
mPointHistory.setTbName("TB_MP_"+mPointEntity.getMpointcode());
|
||
res = mPointHistoryService.save(bizId, mPointHistory);
|
||
if(res>0){
|
||
numHistory++;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
System.out.println("Netty服务端处理信息(" +CommUtil.nowDate()+"):成功更新主表"+ num+"条数据,成功更新子表"+ numHistory+"个;");
|
||
}
|
||
}
|
||
}
|
||
ByteBuf res = Unpooled.wrappedBuffer(new String("收到!信息如下, 请确认:" + buf.toString(CharsetUtil.UTF_8)).getBytes());
|
||
/**
|
||
* 给客户端回复消息
|
||
*/
|
||
ctx.writeAndFlush(res);
|
||
}
|
||
|
||
/**
|
||
* 连接成功后,自动执行该方法
|
||
* @param ctx
|
||
* @throws Exception
|
||
*/
|
||
@Override
|
||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||
System.out.println("服务器首次处理!");
|
||
ByteBuf res = Unpooled.wrappedBuffer(new String("true").getBytes());
|
||
ctx.writeAndFlush(res);
|
||
}
|
||
|
||
@Override
|
||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||
// 处理远程主机强制关闭连接的异常
|
||
if (cause instanceof IOException) {
|
||
String message = cause.getMessage();
|
||
if (message != null && (message.contains("远程主机强迫关闭了一个现有的连接")
|
||
|| message.contains("Connection reset")
|
||
|| message.contains("远程主机强迫关闭")
|
||
|| message.contains("An existing connection was forcibly closed"))) {
|
||
System.out.println("[" + java.time.LocalDateTime.now() + "] 客户端 [" + ctx.channel().remoteAddress() + "] 已断开连接");
|
||
ctx.close();
|
||
return;
|
||
}
|
||
}
|
||
/**
|
||
* 异常捕获 - 只打印简要信息
|
||
*/
|
||
System.err.println("[" + java.time.LocalDateTime.now() + "] 服务器处理异常:" + cause.getClass().getSimpleName() + " - " + cause.getMessage());
|
||
cause.printStackTrace();
|
||
ctx.close();
|
||
}
|
||
|
||
/**
|
||
* 当连接断开时调用
|
||
* @param ctx
|
||
* @throws Exception
|
||
*/
|
||
@Override
|
||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||
System.out.println("[" + java.time.LocalDateTime.now() + "] 客户端 [" + ctx.channel().remoteAddress() + "] 连接断开");
|
||
super.channelInactive(ctx);
|
||
}
|
||
public static boolean isNumber(String str){
|
||
String reg = "^[0-9]+(.[0-9]+)?$";
|
||
return str.matches(reg);
|
||
}
|
||
}
|