zm
2020-05-18 a18bfacbf56b401f6e0fdae8710fbca4df8cff77
commit | author | age
a18bfa 1 package com.codingapi.tx.aop.service.impl;
Z 2
3 import com.codingapi.tx.Constants;
4 import com.codingapi.tx.aop.bean.TxTransactionInfo;
5 import com.codingapi.tx.model.TxGroup;
6 import com.lorne.core.framework.exception.ServiceException;
7 import com.lorne.core.framework.utils.KidUtils;
8 import com.codingapi.tx.aop.bean.TxTransactionLocal;
9 import com.codingapi.tx.datasource.ILCNTransactionControl;
10 import com.codingapi.tx.framework.task.TaskGroupManager;
11 import com.codingapi.tx.framework.task.TxTask;
12 import com.codingapi.tx.netty.service.MQTxManagerService;
13 import com.codingapi.tx.aop.service.TransactionServer;
14 import org.aspectj.lang.ProceedingJoinPoint;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.stereotype.Service;
19
20 import java.util.concurrent.TimeUnit;
21
22 /**
23  * 分布式事务启动参与事务中的业务处理
24  * Created by lorne on 2017/6/8.
25  */
26 @Service(value = "txRunningTransactionServer")
27 public class TxRunningTransactionServerImpl implements TransactionServer {
28
29
30     @Autowired
31     private MQTxManagerService txManagerService;
32
33
34     @Autowired
35     private ILCNTransactionControl transactionControl;
36
37
38     private Logger logger = LoggerFactory.getLogger(TxRunningTransactionServerImpl.class);
39
40     @Override
41     public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable {
42
43         String kid = KidUtils.generateShortUuid();
44         String txGroupId = info.getTxGroupId();
45         logger.debug("--->begin running transaction,groupId:" + txGroupId);
46         long t1 = System.currentTimeMillis();
47
48         boolean isHasIsGroup =  transactionControl.hasGroup(txGroupId);
49
50
51         TxTransactionLocal txTransactionLocal = new TxTransactionLocal();
52         txTransactionLocal.setGroupId(txGroupId);
53         txTransactionLocal.setHasStart(false);
54         txTransactionLocal.setKid(kid);
55         txTransactionLocal.setHasIsGroup(isHasIsGroup);
56         txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());
57         TxTransactionLocal.setCurrent(txTransactionLocal);
58
59
60         try {
61
62             Object res = point.proceed();
63
64             //写操作 处理
65             if(!txTransactionLocal.isReadOnly()) {
66
67                 String methodStr = info.getInvocation().getMethodStr();
68
69                 TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr);
70
71                 //已经进入过该模块的,不再执行此方法
72                 if(!isHasIsGroup) {
73                     String type = txTransactionLocal.getType();
74
75                     TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type);
76
77                     //lcn 连接已经开始等待时.
78                     while (waitTask != null && !waitTask.isAwait()) {
79                         TimeUnit.MILLISECONDS.sleep(1);
80                     }
81
82                     if (resTxGroup == null) {
83
84                         //通知业务回滚事务
85                         if (waitTask != null) {
86                             //修改事务组状态异常
87                             waitTask.setState(-1);
88                             waitTask.signalTask();
89                             throw new ServiceException("update TxGroup error, groupId:" + txGroupId);
90                         }
91                     }
92                 }
93             }
94
95             return res;
96         } catch (Throwable e) {
97             throw e;
98         } finally {
99             TxTransactionLocal.setCurrent(null);
100             long t2 = System.currentTimeMillis();
101             logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1));
102
103         }
104     }
105
106 }