多线程加快搜索速度这一认知是经受住实践考验的。博弈树搜索的并行搜索方式有很多种,例如叶子并行,根并行,树分裂等算法。笔者给出一种实现起来比较简单的根并行算法。
在是实现时需要注意两点,第一,怎么安全的剪枝;第二,如何进行线程间的通信。对于AB剪枝有三点发现可以指导我们设计多线程的并行算法:
这样一来,就可以知道一个节点搜索完成后,如何更新博弈树所有节点的AB值,如何剪枝。通信方式使用的全局变量+读写锁控制的,全局变量保存所有节点状态的AB值。当搜索开始,从根节点沿着搜索路径开始更新沿路的所有节点AB值,然后从全局变量中读取该节点的AB值。搜索完成后,更新父亲节点AB值。
struct parallelNABSearchNode{
int alpha, beta;
parallelNABSearchNode() : alpha(-INT_MAX), beta(INT_MAX){}
parallelNABSearchNode(int aalpha, int abeta) : alpha(aalpha), beta(abeta){}
QString str();
//返回值:true已经更新,false表示没更新
bool getAlphaBeta(int &aalpha, int &abeta);
bool updateLeaf2RootAlphaBeta(int score);
//返回值:true已经更新,false表示没更新
bool updateRoot2LeafAlphaBeta(int aalpha, int abeta);
};
//并行化搜索技术
static QReadWriteLock parallelSearchTableLock;
static QHash<quint64, parallelNABSearchNode> parallelSearchTable;
函数实现三个方法,一个getAlphaBeta(int &aalpha, int &abeta)
是从全局变量中获取AB值,一个updateLeaf2RootAlphaBeta
是从更新该节点的父亲的AB值,还有一个updateRoot2LeafAlphaBeta
是更新儿子节点的AB值。
bool parallelNABSearchNode::getAlphaBeta(int &aalpha, int &abeta){
if(!globalParam::utilGameSetting.IsOpenParallelSearch) return false;
if(aalpha >= alpha && beta >= abeta) return false;
if(aalpha < alpha){
aalpha = alpha;
}
if(beta < abeta){
abeta = beta;
}
return true;
}
bool parallelNABSearchNode::updateLeaf2RootAlphaBeta(int score){
if(!globalParam::utilGameSetting.IsOpenParallelSearch) return false;
if(score > alpha){
alpha = score;
return true;
}
return false;
}
bool parallelNABSearchNode::updateRoot2LeafAlphaBeta(int aalpha, int abeta){
if(!globalParam::utilGameSetting.IsOpenParallelSearch) return false;
if(alpha >= aalpha && abeta >= beta) return false;
if(alpha < aalpha){
alpha = aalpha;
}
if(abeta < beta){
beta = abeta;
}
return true;
}
现在已经实现了线程间通信的工具,只需要在搜索时调用这些利器就可以了,总体的实现思路和常规负极大搜索如出一撤。为了能后续兼容树分裂的算法,这里给出了并行化搜索指定深度的接口。
//fail-soft negMax Alpha-Beta pruning search
int GameAI::NABParallelSearch(int depth, int alpha, int beta, bool maximizingPlayer, quint8 searchSpaceType)
{
int score = -INT_MAX;
QWriteLocker writeLock(&globalParam::parallelSearchTableLock);
// 更新根节点->当前节点搜索路径上AB值
for(int pid = 0;pid < parallelSsearchHistoryPlayersHash.size() - 1; ++pid){
//表项不存在会自动调用默认构造函数
parallelNABSearchNode *curNode = &globalParam::parallelSearchTable[parallelSsearchHistoryPlayersHash[pid]];
parallelNABSearchNode *sontNode = &globalParam::parallelSearchTable[parallelSsearchHistoryPlayersHash[pid + 1]];
//更新下一层的AB值
sontNode->updateRoot2LeafAlphaBeta(- curNode->beta, - curNode->alpha);
}
// 获取当前AB值
globalParam::parallelSearchTable[zobristSearchHash.hash()].getAlphaBeta(alpha, beta);
// // 更新AB值后可能引发剪枝
// if(alpha >= beta){ // AB剪枝
// aiCalInfo.cutTreeTimesCurrentTurn ++;
// return beta;
// }
writeLock.unlock();
//探查置换表中值
if(zobristSearchHash.getNABTranspositionTable(score, depth, alpha, beta)) {
return score;
}
// ??或 分数过大过小
// if (qAbs(score) > globalParam::utilGameSetting.MaxScore){
// //保存置换表
// return score;
// }
int evalPlayer = globalParam::AIPlayer;
MPlayerType searchPlayer = maximizingPlayer ? evalPlayer : UtilReservePlayer(evalPlayer);
// 达到搜索深度
if (depth == 0 || checkSearchBoardWiner() != PLAYER_NONE){
//保存置换表
score = evaluateBoard(evalPlayer);//负极大搜索中评估必须searchPlayer
if(!maximizingPlayer) score *= -1;
// //VCF
// QList<MPoint> vcf, vcfpath;
// if(VCXSearch(globalParam::utilGameSetting.MaxVctSearchDepth, maximizingPlayer, VCT_SEARCH, vcf, vcfpath)){
// qDebug() << "NABsearch : find vct";
// if(maximizingPlayer) return globalParam::utilGameSetting.MaxScore;
// else return -globalParam::utilGameSetting.MaxScore;
// }
return score;
}
// 着法生成
QVector<MPoint> searchPoints;
getSortedSearchSpace(searchPoints, evalPlayer, searchPlayer, searchSpaceType);
int scoreBest = -INT_MAX;
int hashf = hashfUperBound;
MPoint moveBest(InvalidMPoint);
quint16 savedSearchBoardPatternDirection[boardSize][boardSize];
for (const auto &curPoint : searchPoints) {
if (!searchBoardHasPiece(curPoint)) {
setSearchBoard(curPoint, searchPlayer, savedSearchBoardPatternDirection);// searchPlayer落子
score = -NABParallelSearch(depth - 1, -beta, -alpha, !maximizingPlayer,searchSpaceType);
setSearchBoard(curPoint, PLAYER_NONE, savedSearchBoardPatternDirection);// 撤销落子
if (score > scoreBest) {
scoreBest = score;
moveBest = curPoint;
if (score >= beta) {
hashf = hashfLowerBound;
appendSearchKillerTable(curPoint, depth, hashf);
aiCalInfo.cutTreeTimesCurrentTurn ++;
break; // Alpha-Beta 剪枝
}
if (score > alpha) {
alpha = score;
hashf = hashfExact;
//更新当前层的AB值
writeLock.relock();
parallelNABSearchNode *curNode = &globalParam::parallelSearchTable[zobristSearchHash.hash()];
curNode->alpha = scoreBest;
writeLock.unlock();
}
}
}
}
// writeLock.relock();
// //更新当前层的AB值
// parallelNABSearchNode *curNode = &globalParam::parallelSearchTable[zobristSearchHash.hash()];
// curNode->alpha = scoreBest;
// writeLock.unlock();
writeLock.relock();
//更新上一层的AB值:只有当前所有节点搜索完成后,得到的值才是可靠的,才能用来更新父亲节点的AB值
if(parallelSsearchHistoryPlayersHash.size() >= 2){
const quint64& fatherHash = parallelSsearchHistoryPlayersHash[parallelSsearchHistoryPlayersHash.size()-2];
parallelNABSearchNode *fatherNode = &globalParam::parallelSearchTable[fatherHash];
fatherNode->updateLeaf2RootAlphaBeta(-scoreBest);
}
writeLock.unlock();
//更新历史表
appendSearchHistoryTable(moveBest, depth, hashf);
// 更新置换表
zobristSearchHash.appendNABTranspositionTable(depth, scoreBest, hashf, moveBest, UtilReservePlayer(searchPlayer));
return scoreBest;
}
这里实现的并行化搜索效果并不出众,只能说是有一定效果。在深度为6搜索情况下,线程数为4的并行化搜索能加速2~3倍。这一点也是可以理解的,因为负极大搜索的节点如果排序较好,搜索量主要集中在PV路径的搜索上。简单的分裂根节点能提升的速度是可预见,只有动态的分裂树,把算力平摊到PV路径搜索,加速PV路径产生能提高博弈树搜索的瓶颈。
这里实现并行化搜索还存在一些值得思考的问题,如何能提高搜索的稳定性,在发生截断返回时,仍能正确的搜索到PV路径,而不是会因为提前的不安全的剪枝与PV路径失之交臂。后面也希望有时间继续研究下如何高效的分裂树,而不是盲目的根分裂。