zm
2020-05-18 a18bfacbf56b401f6e0fdae8710fbca4df8cff77
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.codingapi.tm.netty.handler;
 
/**
 * Created by lorne on 2017/6/29.
 */
 
import com.alibaba.fastjson.JSONObject;
import com.codingapi.tm.framework.utils.SocketManager;
import com.codingapi.tm.framework.utils.SocketUtils;
import com.codingapi.tm.manager.ModelInfoManager;
import com.codingapi.tm.netty.service.IActionService;
import com.codingapi.tm.netty.service.NettyService;
import com.codingapi.tm.netty.service.impl.ActionATGServiceImpl;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.Executor;
 
/**
 * Handles a server-side channel.
 */
 
@ChannelHandler.Sharable
public class TxCoreServerHandler extends ChannelInboundHandlerAdapter { // (1)
 
    private NettyService nettyService;
 
 
    private Logger logger = LoggerFactory.getLogger(TxCoreServerHandler.class);
 
 
    private Executor threadPool;
 
 
    public TxCoreServerHandler(Executor threadPool,NettyService nettyService) {
        this.threadPool = threadPool;
        this.nettyService = nettyService;
    }
 
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        final String json = SocketUtils.getJson(msg);
        logger.debug("request->"+json);
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                service(json,ctx);
            }
        });
    }
 
    private void service(String json,ChannelHandlerContext ctx){
        if (StringUtils.isNotEmpty(json)) {
            JSONObject jsonObject = JSONObject.parseObject(json);
            String action = jsonObject.getString("a");
            String key = jsonObject.getString("k");
            JSONObject params = JSONObject.parseObject(jsonObject.getString("p"));
            String channelAddress = ctx.channel().remoteAddress().toString();
 
            IActionService actionService =  nettyService.getActionService(action);
 
            String res = actionService.execute(channelAddress,key,params);
 
            JSONObject resObj = new JSONObject();
            resObj.put("k", key);
            resObj.put("d", res);
            if(actionService instanceof ActionATGServiceImpl){
                logger.info("返回:"+ resObj.toString());
            }
            SocketUtils.sendMsg(ctx,resObj.toString());
 
        }
    }
 
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
 
        //是否到达最大上线连接数
        if (SocketManager.getInstance().isAllowConnection()) {
            SocketManager.getInstance().addClient(ctx.channel());
        } else {
            ctx.close();
        }
        super.channelRegistered(ctx);
    }
 
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
 
        SocketManager.getInstance().removeClient(ctx.channel());
        String modelName = ctx.channel().remoteAddress().toString();
        SocketManager.getInstance().outLine(modelName);
 
        ModelInfoManager.getInstance().removeModelInfo(modelName);
        super.channelUnregistered(ctx);
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //ctx.close();
    }
 
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //心跳配置
        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                ctx.close();
            }
        }
    }
 
}