package com.codingapi.tx.aop.service.impl; import com.codingapi.tx.Constants; import com.codingapi.tx.aop.bean.TxTransactionInfo; import com.codingapi.tx.model.TxGroup; import com.lorne.core.framework.exception.ServiceException; import com.lorne.core.framework.utils.KidUtils; import com.codingapi.tx.aop.bean.TxTransactionLocal; import com.codingapi.tx.datasource.ILCNTransactionControl; import com.codingapi.tx.framework.task.TaskGroupManager; import com.codingapi.tx.framework.task.TxTask; import com.codingapi.tx.netty.service.MQTxManagerService; import com.codingapi.tx.aop.service.TransactionServer; import org.aspectj.lang.ProceedingJoinPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; /** * 分布式事务启动参与事务中的业务处理 * Created by lorne on 2017/6/8. */ @Service(value = "txRunningTransactionServer") public class TxRunningTransactionServerImpl implements TransactionServer { @Autowired private MQTxManagerService txManagerService; @Autowired private ILCNTransactionControl transactionControl; private Logger logger = LoggerFactory.getLogger(TxRunningTransactionServerImpl.class); @Override public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable { String kid = KidUtils.generateShortUuid(); String txGroupId = info.getTxGroupId(); logger.debug("--->begin running transaction,groupId:" + txGroupId); long t1 = System.currentTimeMillis(); boolean isHasIsGroup = transactionControl.hasGroup(txGroupId); TxTransactionLocal txTransactionLocal = new TxTransactionLocal(); txTransactionLocal.setGroupId(txGroupId); txTransactionLocal.setHasStart(false); txTransactionLocal.setKid(kid); txTransactionLocal.setHasIsGroup(isHasIsGroup); txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime()); TxTransactionLocal.setCurrent(txTransactionLocal); try { Object res = point.proceed(); //写操作 处理 if(!txTransactionLocal.isReadOnly()) { String methodStr = info.getInvocation().getMethodStr(); TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr); //已经进入过该模块的,不再执行此方法 if(!isHasIsGroup) { String type = txTransactionLocal.getType(); TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type); //lcn 连接已经开始等待时. while (waitTask != null && !waitTask.isAwait()) { TimeUnit.MILLISECONDS.sleep(1); } if (resTxGroup == null) { //通知业务回滚事务 if (waitTask != null) { //修改事务组状态异常 waitTask.setState(-1); waitTask.signalTask(); throw new ServiceException("update TxGroup error, groupId:" + txGroupId); } } } } return res; } catch (Throwable e) { throw e; } finally { TxTransactionLocal.setCurrent(null); long t2 = System.currentTimeMillis(); logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1)); } } }