问题描述
service A 发送事件A1 提交成功之后 在afterCommit调用service B 发送事件B1 在afterCommit中未调用
环境
jdk8
spring-tx:4.3.29.RELEASE
源码调用链
TransactionInterceptor.invoke–>
TransactionAspectSupport.invokeWithinTransaction–>commitTransactionAfterReturning–>commitTransactionAfterReturning–>
AbstractPlatformTransactionManager.commit–>processCommit–>triggerAfterCommit
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering afterCommit synchronization");
}
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
/**
* Return if a new transaction synchronization has been opened
* for this transaction.
*/
public boolean isNewSynchronization() {
return this.newSynchronization;
}
对于此场景 service B 发送事件B1后,status.isNewSynchronization() 为false 故不用再次调用
DefaultTransactionStatus属性newSynchronization的生成逻辑源码调用链
TransactionInterceptor.invoke–>
TransactionAspectSupport.invokeWithinTransaction–>createTransactionIfNecessary–>
AbstractPlatformTransactionManager.getTransaction
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
如果transaction存在,则return handleExistingTransaction(definition, transaction, debugEnabled);
不存在,则新创建,如果新创建则newSynchronization为true