在本文中,我将对Fast DDS中的Payload Pool源码进行分析,它是Fast DDS中一种重要的资源管理机制,用于高效地管理和重用数据的存储空间。
在实时通信中,数据的传输和处理效率对于系统的性能至关重要。Payload Pool为Fast DDS提供了一种优化策略,用于管理数据负载的内存分配和释放。通过预先分配一定数量的数据缓冲区,并将其存储在Payload Pool中,Fast DDS能够在数据传输过程中高效地重用这些缓冲区,避免频繁的内存分配和释放操作,从而显著提高系统的性能和效率。
本文将深入探讨Fast DDS中各种Payload Pool的实现细节和关键功能。我们将详细解释Payload Pool的工作原理,以帮助读者在实践中更好地利用和优化Payload Pool。
通过深入研究Fast DDS中Payload Pool的源码实现,读者将能够更好地理解Fast DDS内部的数据管理和资源优化机制,并了解如何在实时通信应用程序中合理配置和使用Payload Pool,以实现更高效、可靠的数据传输。
FastDDS提供的Payload Pool如下
TopicPayloadPool中有两个free_payloads_和all_payloads_连个vector,里面存储的是PayloadNode*类型。下图为PayloadNode的结构:
一般我们需要一个不定长buffer的时候可能会在NodeInfo中加一个char*字段,然后创建NodeInfo后再给buffer申请一块内存,这样需要申请两次。这里的实现是在NodeInfo的最后加一个一字节数组,申请内存时直接申请buffer size + offsetof(NodeInfo, data),一次申请完成,有利于减少内存碎片。
所有创建出来的PayloadNode
指针都会存到all_payloads_
中,NodeInfo中的data_index
就是它在all_payloads_
中的位置。data_size
就是实际data的大小,ref_counter
是引用计数。free_payloads_
中存储的就是创建出来,但是还没有使用的PayloadNode
,分配出去之后会从free_payloads_
中删除。
TopicPayloadPool有几个重要的方法,下面我们来一一介绍
参数size指的是PayloadNode中data的大小。如果内存池超过上限则返回空指针,否则调用do_allocate()
创建PayloadNode
加入到all_payloads_
,PayloadNode
的构造函数会创建NodeInfo
用来存储数据。
TopicPayloadPool::PayloadNode* TopicPayloadPool::allocate(
uint32_t size)
{
if (all_payloads_.size() >= max_pool_size_)
{
logWarning(RTPS_HISTORY, "Maximum number of allowed reserved payloads reached");
return nullptr;
}
return do_allocate(size);
}
TopicPayloadPool::PayloadNode* TopicPayloadPool::do_allocate(
uint32_t size)
{
PayloadNode* payload = new (std::nothrow) PayloadNode(size);
if (payload != nullptr)
{
payload->data_index(static_cast<uint32_t>(all_payloads_.size()));
all_payloads_.push_back(payload);
}
else
{
logWarning(RTPS_HISTORY, "Failure to create a new payload ");
}
return payload;
}
如果free_payloads_
为空,则调用allocate()
新分配一块,否则就将free_payloads_
中最后一个拿出来,如果是可以resize的并且size变大了,那么重新分配内存到size大小。注意这里重新分配内存使用的是realloc。
bool TopicPayloadPool::do_get_payload(
uint32_t size,
CacheChange_t& cache_change,
bool resizeable)
{
PayloadNode* payload = nullptr;
std::unique_lock<std::mutex> lock(mutex_);
if (free_payloads_.empty())
{
payload = allocate(size); //Allocates a single payload
if (payload == nullptr)
{
lock.unlock();
cache_change.serializedPayload.data = nullptr;
cache_change.serializedPayload.max_size = 0;
cache_change.payload_owner(nullptr);
return false;
}
}
else
{
payload = free_payloads_.back();
free_payloads_.pop_back();
}
// Resize if needed
if (resizeable && size > payload->data_size())
{
if (!payload->resize(size))
{
// Failed to resize, but we can still keep it for later.
free_payloads_.push_back(payload);
lock.unlock();
logError(RTPS_HISTORY, "Failed to resize the payload");
cache_change.serializedPayload.data = nullptr;
cache_change.serializedPayload.max_size = 0;
cache_change.payload_owner(nullptr);
return false;
}
}
lock.unlock();
payload->reference();
cache_change.serializedPayload.data = payload->data();
cache_change.serializedPayload.max_size = payload->data_size();
cache_change.payload_owner(this);
return true;
}
get_payload(SerializedPayload_t& data, IPayloadPool*& data_owner, CacheChange_t& cache_change)
:
bool TopicPayloadPool::get_payload(
uint32_t size,
CacheChange_t& cache_change)
{
return do_get_payload(size, cache_change, false);
}
bool TopicPayloadPool::get_payload(
SerializedPayload_t& data,
IPayloadPool*& data_owner,
CacheChange_t& cache_change)
{
assert(cache_change.writerGUID != GUID_t::unknown());
assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());
if (data_owner == this)
{
PayloadNode::reference(data.data);
cache_change.serializedPayload.data = data.data;
cache_change.serializedPayload.length = data.length;
cache_change.serializedPayload.max_size = PayloadNode::data_size(data.data);
cache_change.payload_owner(this);
return true;
}
else
{
if (get_payload(data.length, cache_change))
{
if (!cache_change.serializedPayload.copy(&data, true))
{
release_payload(cache_change);
return false;
}
if (data_owner == nullptr)
{
data_owner = this;
data.data = cache_change.serializedPayload.data;
PayloadNode::reference(data.data);
}
return true;
}
}
return false;
}
直接将内存池size扩充到min_num_payloads,因为是新建,所以是free的,除了存到all_payloads_
中,同时也会存到free_payloads_
中。
void TopicPayloadPool::reserve (
uint32_t min_num_payloads,
uint32_t size)
{
assert (min_num_payloads <= max_pool_size_);
for (size_t i = all_payloads_.size(); i < min_num_payloads; ++i)
{
PayloadNode* payload = do_allocate(size);
if (payload != nullptr)
{
free_payloads_.push_back(payload);
}
}
}
reserve_history()
会加大内存池大小的最大值,release_history()
会减小最大值。reserve_history()
不会增加内存池中实际的PayloadNode的数量,但是release_history()
会调用shrink ()
,如果最大值减少到了比当前的all_payloads_.size()
还小,则会将多余的free_payloads_
中的PayloadNode
删除,同时也将all_payloads_
中相同的PayloadNode
删除。
为了减少内存的频繁申请及碎片化,这里payload的列表并没有使用list,但是vector的删除操作会将删除位置后面的所有数据往前移,所以这里应该避免这样的事情发生。free_payloads_
是从后往前删的,不会出现这个问题,但是free_payloads_
的最后一个元素,在all_payloads_
中基本上是存储到中间位置的,如果直接从all_payloads_
中删除就会出现上面的问题,所以这里是将最后一个元素copy到这里,然后再将最后一个元素删除。
bool TopicPayloadPool::shrink (
uint32_t max_num_payloads)
{
assert(payload_pool_allocated_size() - payload_pool_available_size() <= max_num_payloads);
while (max_num_payloads < all_payloads_.size())
{
PayloadNode* payload = free_payloads_.back();
free_payloads_.pop_back();
// Find data in allPayloads, remove element, then delete it
all_payloads_.at(payload->data_index()) = all_payloads_.back();
all_payloads_.back()->data_index(payload->data_index());
all_payloads_.pop_back();
delete payload;
}
return true;
}
注意do_get_payload()的第三个参数resizeable为true,意味着这个pool可以获取任意大小的payload。
bool get_payload(
uint32_t size,
CacheChange_t& cache_change) override
{
return (size > 0u) && do_get_payload(size, cache_change, true);
}
引用计数-1之后如果等于1了,也就是说没有其他人在用这个payload的时候,从all_payloads_
中删除。这意味着payload并不是一直在pool中,也会动态创建、销毁,但是可以允许多人共享。
bool release_payload(
CacheChange_t& cache_change) override
{
assert(cache_change.payload_owner() == this);
{
if (PayloadNode::dereference(cache_change.serializedPayload.data))
{
//First remove it from all_payloads
std::unique_lock<std::mutex> lock(mutex_);
uint32_t data_index = PayloadNode::data_index(cache_change.serializedPayload.data);
PayloadNode* payload = all_payloads_.at(data_index);
all_payloads_.at(data_index) = all_payloads_.back();
all_payloads_.back()->data_index(data_index);
all_payloads_.pop_back();
lock.unlock();
// Now delete the data
delete(payload);
}
}
cache_change.serializedPayload.length = 0;
cache_change.serializedPayload.pos = 0;
cache_change.serializedPayload.max_size = 0;
cache_change.serializedPayload.data = nullptr;
cache_change.payload_owner(nullptr);
return true;
}
与父类的区别是,update_maximum_size()
后没有调用shrink()
,不会缩减pool的实际大小。
bool release_history(
const PoolConfig& config,
bool /*is_reader*/) override
{
assert(config.memory_policy == memory_policy());
std::lock_guard<std::mutex> lock(mutex_);
update_maximum_size(config, false);
return true;
}
与DynamicTopicPayloadPool一致,size大于0才会申请。
bool get_payload(
uint32_t size,
CacheChange_t& cache_change) override
{
return (size > 0u) && do_get_payload(size, cache_change, true);
}
没有重写其他函数,那么release_payload的行为与父类一直,会还回free_payloads里,这就是Reusable。
构造时传入payload_size,每个payload大小一致。
explicit PreallocatedTopicPayloadPool(
uint32_t payload_size)
: payload_size_(payload_size)
, minimum_pool_size_(0)
{
assert(payload_size_ > 0);
}
这里第三个三叔传的false,就是不可以resize
bool get_payload(
uint32_t /* size */,
CacheChange_t& cache_change) override
{
return do_get_payload(payload_size_, cache_change, false);
}
调用reserve()直接创建所有的payload node
bool reserve_history(
const PoolConfig& config,
bool is_reader) override
{
if (!TopicPayloadPool::reserve_history(config, is_reader))
{
return false;
}
std::lock_guard<std::mutex> lock(mutex_);
minimum_pool_size_ += config.initial_size;
reserve(minimum_pool_size_, payload_size_);
return true;
}
调用父类release_history,不会真正的释放内存。
bool release_history(
const PoolConfig& config,
bool is_reader) override
{
{
std::lock_guard<std::mutex> lock(mutex_);
minimum_pool_size_ -= config.initial_size;
}
return TopicPayloadPool::release_history(config, is_reader);
}
与PreallocatedTopicPayloadPool类似,构造中会先初始化payload的最小size。类名里的Realloc意思是payload的大小可以扩大,体现在get_payload()中。
explicit PreallocatedReallocTopicPayloadPool(
uint32_t payload_size)
: min_payload_size_(payload_size)
, minimum_pool_size_(0)
{
assert(min_payload_size_ > 0);
}
选择了传入的size,和初始化时的min_payload_size_中的最大值,并且调用do_get_payload()
时resizeable参数为true。可以扩展Payload的大小
bool get_payload(
uint32_t size,
CacheChange_t& cache_change) override
{
return do_get_payload(std::max(size, min_payload_size_), cache_change, true);
}
调用reserve()预先分配出minimum_pool_size_个payload node
bool reserve_history(
const PoolConfig& config,
bool is_reader) override
{
if (!TopicPayloadPool::reserve_history(config, is_reader))
{
return false;
}
std::lock_guard<std::mutex> lock(mutex_);
minimum_pool_size_ += config.initial_size;
reserve(minimum_pool_size_, min_payload_size_);
return true;
}
除了维护下自己的minimum_pool_size_外,直接调用TopicPayloadPool::release_history
,并不会真正释放内存。
bool release_history(
const PoolConfig& config,
bool is_reader) override
{
{
std::lock_guard<std::mutex> lock(mutex_);
minimum_pool_size_ -= config.initial_size;
}
return TopicPayloadPool::release_history(config, is_reader);
}
这个pool看起来很简单,似乎不是一个内存池。这里的内存复用实际体现在CacheChange_t的SerializedPayload_t上,它有两个成员 uint32_t length; octet* data;
。复用就是通过申请时使用realloc,释放时将length设置为0,不delete data来实现。默认的实现就是复用data内存。
class BaseImpl : public IPayloadPool
{
bool get_payload(
uint32_t size,
CacheChange_t& cache_change) override
{
cache_change.serializedPayload.reserve(size);
cache_change.payload_owner(this);
return true;
}
bool get_payload(
SerializedPayload_t& data,
IPayloadPool*& /* data_owner */,
CacheChange_t& cache_change) override
{
assert(cache_change.writerGUID != GUID_t::unknown());
assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());
if (cache_change.serializedPayload.copy(&data, false))
{
cache_change.payload_owner(this);
return true;
}
return false;
}
bool release_payload(
CacheChange_t& cache_change) override
{
assert(cache_change.payload_owner() == this);
cache_change.serializedPayload.length = 0;
cache_change.serializedPayload.pos = 0;
cache_change.payload_owner(nullptr);
return true;
}
};
template <MemoryManagementPolicy_t policy_>
class Impl : public BaseImpl
{
};
template <>
class Impl<DYNAMIC_REUSABLE_MEMORY_MODE> : public BaseImpl
{
};
release_payload()
: 调用了cache_change.serializedPayload.empty()
释放了内存。
template <>
class Impl<DYNAMIC_RESERVE_MEMORY_MODE> : public BaseImpl
{
public:
bool release_payload(
CacheChange_t& cache_change) override
{
assert(cache_change.payload_owner() == this);
cache_change.serializedPayload.empty();
cache_change.payload_owner(nullptr);
return true;
}
};
预分配模式构造时固定payload_size_,get_payload()时不使用传入的size,而是使用固定的payload_size_。
template <>
class Impl<PREALLOCATED_MEMORY_MODE> : public BaseImpl
{
public:
explicit Impl(
uint32_t payload_size)
: payload_size_(payload_size)
{
assert(payload_size_ > 0);
}
bool get_payload(
uint32_t /* size */,
CacheChange_t& cache_change) override
{
cache_change.serializedPayload.reserve(payload_size_);
cache_change.payload_owner(this);
return true;
}
bool get_payload(
SerializedPayload_t& data,
IPayloadPool*& /* data_owner */,
CacheChange_t& cache_change) override
{
assert(cache_change.writerGUID != GUID_t::unknown());
assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());
cache_change.serializedPayload.reserve(payload_size_);
if (cache_change.serializedPayload.copy(&data, true))
{
cache_change.payload_owner(this);
return true;
}
return false;
}
private:
uint32_t payload_size_;
};
与Impl<PREALLOCATED_MEMORY_MODE>类似,with_realloc体现在cache_change.serializedPayload.reserve(std::max(size, min_payload_size_));
template <>
class Impl<PREALLOCATED_WITH_REALLOC_MEMORY_MODE> : public BaseImpl
{
public:
explicit Impl(
uint32_t payload_size)
: min_payload_size_(payload_size)
{
assert(min_payload_size_ > 0);
}
bool get_payload(
uint32_t size,
CacheChange_t& cache_change) override
{
cache_change.serializedPayload.reserve(std::max(size, min_payload_size_));
cache_change.payload_owner(this);
return true;
}
private:
uint32_t min_payload_size_;
};
data sharing pool会申请一整块共享内存,将这块内存分为三个部分,第一部分就是存用户数据的地方,第二部分存储PayloadNode的在segment中的offset,第三部分存的是PoolDescriptor。
用于存储用户数据,共享内存中会存在多个连续的PayloadNode,这里是8字节对齐,为了减少访问内存次数(有需要可以研究下内存对齐)。
class alignas (8) PayloadNode{...}
里面有一些元数据,在元数据的尾巴上跟着用户数据,跟前面提到的一个一字节数组类似,区别是这里没有了数组变量名,需要使用PayloadNode的指针+data_offset访问。
存储一个PayloadNode的在共享内存的位置。
using Offset = std::uint32_t;
struct alignas (8) PoolDescriptor
{
uint32_t history_size; //< Number of payloads in the history
uint64_t notified_begin; //< The index of the oldest history entry already notified (ready to read)
uint64_t notified_end; //< The index of the history entry that will be notified next
uint32_t liveliness_sequence; //< The ID of the last liveliness assertion sent by the writer
};
WriterPool中有一个成员FixedSizeQueue<PayloadNode*> free_payloads_
用于存储未使用的PayloadNode指针,底层实现类似环形队列。
与TopicPayloadPool基本相同,唯一区别是因为使用的共享内存,不可以resize,所以free_payloads_
耗光之后就无法再获取到payload。
bool get_payload(
uint32_t /*size*/,
CacheChange_t& cache_change) override
{
if (free_payloads_.empty())
{
return false;
}
PayloadNode* payload = free_payloads_.front();
free_payloads_.pop_front();
// Reset all the metadata to signal the reader that the payload is dirty
payload->reset();
cache_change.serializedPayload.data = payload->data();
cache_change.serializedPayload.max_size = max_data_size_;
cache_change.payload_owner(this);
return true;
}
bool get_payload(
SerializedPayload_t& data,
IPayloadPool*& data_owner,
CacheChange_t& cache_change) override
{
assert(cache_change.writerGUID != GUID_t::unknown());
assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());
if (data_owner == this)
{
cache_change.serializedPayload.data = data.data;
cache_change.serializedPayload.length = data.length;
cache_change.serializedPayload.max_size = data.length;
cache_change.payload_owner(this);
return true;
}
else
{
if (get_payload(data.length, cache_change))
{
if (!cache_change.serializedPayload.copy(&data, true))
{
release_payload(cache_change);
return false;
}
if (data_owner == nullptr)
{
data_owner = this;
data.data = cache_change.serializedPayload.data;
}
return true;
}
}
return false;
}
release时,如果payload已经从history中删除了,那么把PoolDescriptor中的notified_begin 更新到首个没有删除的位置。并将这些remove的payload还回free_payloads_
,否则直接还给free_payloads_
。
bool release_payload(
CacheChange_t& cache_change) override
{
assert(cache_change.payload_owner() == this);
// Payloads are reset on the `get` operation, the `release` leaves the data to give more chances to the reader
PayloadNode* payload = PayloadNode::get_from_data(cache_change.serializedPayload.data);
if (payload->has_been_removed())
{
advance_till_first_non_removed();
}
else
{
free_payloads_.push_back(payload);
}
logInfo(DATASHARING_PAYLOADPOOL, "Change released with SN " << cache_change.sequenceNumber);
return DataSharingPayloadPool::release_payload(cache_change);
}
共享内存的创建就在这里,segment_id_
其实就是writer的Guid,segment_name_
为 “fast_datasharing_” + writer_guid.guidPrefix + "" +writer_guid.entityId,默认/dev/shm下会创建一个segment_name 为文件名的文件。如果申请的共享内存超过4G则会overflow失败。
template <typename T>
bool init_shared_segment(
const RTPSWriter* writer,
const std::string& shared_dir)
{
writer_ = writer;
segment_id_ = writer_->getGuid();
segment_name_ = generate_segment_name(shared_dir, segment_id_);
std::unique_ptr<T> local_segment;
size_t payload_size;
uint64_t estimated_size_for_payloads_pool;
uint64_t estimated_size_for_history;
uint32_t size_for_payloads_pool;
try
{
// We need to reserve the whole segment at once, and the underlying classes use uint32_t as size type.
// In order to avoid overflows, we will calculate using uint64 and check the casting
bool overflow = false;
size_t per_allocation_extra_size = T::compute_per_allocation_extra_size(
alignof(PayloadNode), DataSharingPayloadPool::domain_name());
payload_size = DataSharingPayloadPool::node_size(max_data_size_);
estimated_size_for_payloads_pool = pool_size_ * payload_size;
overflow |= (estimated_size_for_payloads_pool != static_cast<uint32_t>(estimated_size_for_payloads_pool));
size_for_payloads_pool = static_cast<uint32_t>(estimated_size_for_payloads_pool);
//Reserve one extra to avoid pointer overlapping
estimated_size_for_history = (pool_size_ + 1) * sizeof(Segment::Offset);
overflow |= (estimated_size_for_history != static_cast<uint32_t>(estimated_size_for_history));
uint32_t size_for_history = static_cast<uint32_t>(estimated_size_for_history);
uint32_t descriptor_size = static_cast<uint32_t>(sizeof(PoolDescriptor));
uint64_t estimated_segment_size = size_for_payloads_pool + per_allocation_extra_size +
size_for_history + per_allocation_extra_size +
descriptor_size + per_allocation_extra_size;
overflow |= (estimated_segment_size != static_cast<uint32_t>(estimated_segment_size));
uint32_t segment_size = static_cast<uint32_t>(estimated_segment_size);
if (overflow)
{
logError(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_
<< ": Segment size is too large: " << estimated_size_for_payloads_pool
<< " (max is " << std::numeric_limits<uint32_t>::max() << ")."
<< " Please reduce the maximum size of the history");
return false;
}
//Open the segment
T::remove(segment_name_);
local_segment.reset(
new T(boost::interprocess::create_only,
segment_name_,
segment_size + T::EXTRA_SEGMENT_SIZE));
}
catch (const std::exception& e)
{
logError(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_
<< ": " << e.what());
return false;
}
try
{
// Alloc the memory for the pool
// Cannot use 'construct' because we need to reserve extra space for the data,
// which is not considered in sizeof(PayloadNode).
payloads_pool_ = static_cast<octet*>(local_segment->get().allocate(size_for_payloads_pool));
// Initialize each node in the pool
free_payloads_.init(pool_size_);
octet* payload = payloads_pool_;
for (uint32_t i = 0; i < pool_size_; ++i)
{
new (payload) PayloadNode();
// All payloads are free
free_payloads_.push_back(reinterpret_cast<PayloadNode*>(payload));
payload += (ptrdiff_t)payload_size;
}
//Alloc the memory for the history
history_ = local_segment->get().template construct<Segment::Offset>(history_chunk_name())[pool_size_ + 1]();
//Alloc the memory for the descriptor
descriptor_ = local_segment->get().template construct<PoolDescriptor>(descriptor_chunk_name())();
// Initialize the data in the descriptor
descriptor_->history_size = pool_size_ + 1;
descriptor_->notified_begin = 0u;
descriptor_->notified_end = 0u;
descriptor_->liveliness_sequence = 0u;
free_history_size_ = pool_size_;
}
catch (std::exception& e)
{
T::remove(segment_name_);
logError(DATASHARING_PAYLOADPOOL, "Failed to initialize segment " << segment_name_
<< ": " << e.what());
return false;
}
segment_ = std::move(local_segment);
is_initialized_ = true;
return true;
}
加入后,history_会更新准确的PayloadNode位置。
/**
* Fills the metadata of the shared payload from the cache change information
* and adds the payload's offset to the shared history
*/
void add_to_shared_history(
const CacheChange_t* cache_change)
{
assert(cache_change);
assert(cache_change->serializedPayload.data);
assert(cache_change->payload_owner() == this);
assert(free_history_size_ > 0);
// Fill the payload metadata with the change info
PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data);
node->status(ALIVE);
node->data_length(cache_change->serializedPayload.length);
node->source_timestamp(cache_change->sourceTimestamp);
node->writer_GUID(cache_change->writerGUID);
node->instance_handle(cache_change->instanceHandle);
if (cache_change->write_params.related_sample_identity() != SampleIdentity::unknown())
{
node->related_sample_identity(cache_change->write_params.related_sample_identity());
}
// Set the sequence number last, it signals the data is ready
node->sequence_number(cache_change->sequenceNumber);
// Add it to the history
history_[static_cast<uint32_t>(descriptor_->notified_end)] = segment_->get_offset_from_address(node);
logInfo(DATASHARING_PAYLOADPOOL, "Change added to shared history"
<< " with SN " << cache_change->sequenceNumber);
advance(descriptor_->notified_end);
--free_history_size_;
}
仅仅是标记removed
/**
* Removes the payload's offset from the shared history
*
* Payloads don't need to be removed from the history in the same order
* they where added, but a payload will not be available through @ref get_payload until all
* payloads preceding it have been removed from the shared history.
*/
void remove_from_shared_history(
const CacheChange_t* cache_change)
{
assert(cache_change);
assert(cache_change->serializedPayload.data);
assert(cache_change->payload_owner() == this);
assert(descriptor_->notified_end != descriptor_->notified_begin);
assert(free_history_size_ < descriptor_->history_size);
logInfo(DATASHARING_PAYLOADPOOL, "Change removed from shared history"
<< " with SN " << cache_change->sequenceNumber);
PayloadNode* payload = PayloadNode::get_from_data(cache_change->serializedPayload.data);
payload->has_been_removed(true);
}
将removed payload还回free_payloads_
并且更新notified_begin
。
void advance_till_first_non_removed()
{
while (descriptor_->notified_begin != descriptor_->notified_end)
{
auto offset = history_[static_cast<uint32_t>(descriptor_->notified_begin)];
auto payload = static_cast<PayloadNode*>(segment_->get_address_from_offset(offset));
if (!payload->has_been_removed())
{
break;
}
payload->has_been_removed(false);
free_payloads_.push_back(payload);
advance(descriptor_->notified_begin);
++free_history_size_;
}
}
只有WriterPool可以返回新的payloads。data_owner == this
时,直接将data赋值给cache_change,否则一定是进程内的writer,进程内调用时同步的,所以不需要考虑覆盖写问题,直接read_from_shared_history()
。
bool get_payload(
uint32_t /*size*/,
CacheChange_t& /*cache_change*/) override
{
// Only WriterPool can return new payloads
return false;
}
bool get_payload(
SerializedPayload_t& data,
IPayloadPool*& data_owner,
CacheChange_t& cache_change) override
{
if (data_owner == this)
{
cache_change.serializedPayload.data = data.data;
cache_change.serializedPayload.length = data.length;
cache_change.serializedPayload.max_size = data.length;
cache_change.payload_owner(this);
return true;
}
// If owner is not this, then it must be an intraprocess datasharing writer
assert(nullptr != dynamic_cast<DataSharingPayloadPool*>(data_owner));
PayloadNode* payload = PayloadNode::get_from_data(data.data);
// No need to check validity, on intraprocess there is no override of payloads
read_from_shared_history(cache_change, payload);
return true;
}
读取前先记录sequence_number,读取完成后校验是否有被更改,因为是共享内存,写端有可能在我们读的过程中重新写数据。
bool read_from_shared_history(
CacheChange_t& cache_change,
PayloadNode* payload)
{
// The sequence number can be unknown already, but we defer the check to the end
cache_change.sequenceNumber = payload->sequence_number();
cache_change.serializedPayload.data = payload->data();
cache_change.serializedPayload.max_size = payload->data_length();
cache_change.serializedPayload.length = payload->data_length();
cache_change.kind = static_cast<ChangeKind_t>(payload->status());
cache_change.writerGUID = payload->writer_GUID();
cache_change.instanceHandle = payload->instance_handle();
cache_change.sourceTimestamp = payload->source_timestamp();
cache_change.write_params.sample_identity(payload->related_sample_identity());
SequenceNumber_t check = payload->sequence_number();
if (check == c_SequenceNumber_Unknown || check != cache_change.sequenceNumber)
{
// data override while processing
return false;
}
cache_change.payload_owner(this);
return true;
}
直接release
bool release_payload(
CacheChange_t& cache_change) override
{
assert(cache_change.payload_owner() == this);
return DataSharingPayloadPool::release_payload(cache_change);
}
这里用的segment_id_
, segment_name_
与WriterPool一致,但是创建Segment的时候用的是boost::interprocess::open_read_only
,这样就可以与WriterPool使用同一块共享内存。
template <typename T>
bool init_shared_segment(
const GUID_t& writer_guid,
const std::string& shared_dir)
{
segment_id_ = writer_guid;
segment_name_ = generate_segment_name(shared_dir, writer_guid);
std::unique_ptr<T> local_segment;
// Open the segment
try
{
local_segment = std::unique_ptr<T>(
new T(boost::interprocess::open_read_only,
segment_name_.c_str()));
}
catch (const std::exception& e)
{
logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open segment " << segment_name_
<< ": " << e.what());
return false;
}
// Get the pool description
descriptor_ = local_segment->get().template find<PoolDescriptor>(descriptor_chunk_name()).first;
if (!descriptor_)
{
local_segment.reset();
logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload pool descriptor " << segment_name_);
return false;
}
// Get the history
history_ = local_segment->get().template find<Segment::Offset>(history_chunk_name()).first;
if (!history_)
{
local_segment.reset();
logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload history " << segment_name_);
return false;
}
// Set the reading pointer
next_payload_ = begin();
segment_ = std::move(local_segment);
if (is_volatile_)
{
CacheChange_t ch;
SequenceNumber_t last_sequence = c_SequenceNumber_Unknown;
get_next_unread_payload(ch, last_sequence);
while (ch.sequenceNumber != SequenceNumber_t::unknown())
{
advance(next_payload_);
get_next_unread_payload(ch, last_sequence);
}
assert(next_payload_ == end());
}
return true;
}
首先通过ensure_reading_reference_is_in_bounds()
找到一个可以读到数据的位置,然后read_from_shared_history()
读取数据。read_from_shared_history()
过程中如果被复写,会返回失败,失败后会尝试从下一个位置读数据。
void get_next_unread_payload(
CacheChange_t& cache_change,
SequenceNumber_t& last_sequence_number,
uint64_t until)
{
last_sequence_number = last_sn_;
while (next_payload_ < until)
{
// First ensure we are not too far behind
// This may move the next_payload_ past the until value
if (!ensure_reading_reference_is_in_bounds() && next_payload_ >= until)
{
break;
}
// history_[next_payload_] contains the offset to the payload
PayloadNode* payload = static_cast<PayloadNode*>(
segment_->get_address_from_offset(history_[static_cast<uint32_t>(next_payload_)]));
if (!read_from_shared_history(cache_change, payload))
{
// Overriden while retrieving. Discard and continue
advance(next_payload_);
logWarning(RTPS_READER, "Dirty data detected on datasharing writer " << writer());
continue;
}
if (last_sn_ != c_SequenceNumber_Unknown && last_sn_ >= cache_change.sequenceNumber)
{
// Sequence number went backwards, it was most probably overriden.
continue;
}
if (!ensure_reading_reference_is_in_bounds())
{
// We may have been taken over and read a payload that is too far forward. Discard and continue
continue;
}
last_sn_ = cache_change.sequenceNumber;
return;
}
// Reset the data (may cause errors later on)
cache_change.sequenceNumber = c_SequenceNumber_Unknown;
cache_change.serializedPayload.data = nullptr;
cache_change.payload_owner(nullptr);
}
notified_begin低四字节为history_的索引,每次有新的通知过来+1,超过pool_size_后进位到高四字节。
我们看上面这种情况,reader刚好还可以读取到一个没有被覆盖的值,马上writer第10圈要追上reader了,扣圈就会覆盖数据,如果这个时候reader high =9, low = 4 就无法读取到想要读的值,已经被wrier复写了。如果reader high = 8, low = 5 ,说明已经落后太远,更是读不到。后一种情况就是就是代码中判断的next_payload_high + 1 < notified_end_high
,前一种情况就是next_payload_high < notified_end_high && static_cast<uint32_t>(next_payload_) <= static_cast<uint32_t>(notified_end)
。此时要更新next_payload_到一个当前能读到的最旧数据的位置。reader high =9, low = 4或者reader high = 8, low = 5时都会更新到上图的reader high = 9, low = 5的位置。
bool ensure_reading_reference_is_in_bounds()
{
auto notified_end = end();
auto notified_end_high = notified_end >> 32;
auto next_payload_high = next_payload_ >> 32;
if (next_payload_high + 1 < notified_end_high ||
(next_payload_high < notified_end_high &&
static_cast<uint32_t>(next_payload_) <= static_cast<uint32_t>(notified_end)))
{
logWarning(RTPS_READER, "Writer " << writer() << " overtook reader in datasharing pool."
<< " Some changes will be missing.");
// lower part is the index, upper part is the loop counter
next_payload_ = ((notified_end_high - 1) << 32) + static_cast<uint32_t>(notified_end);
advance(next_payload_);
return false;
}
return true;
}
以上就是fastdds中内存池的实现,下表中总结了不同的内存池是否会预分配内存、get_payload时如果不存在 free payload,是否会扩展PoolSize、payload size大小不够是否是realloc、以及release_payload时时候会真的释放内存。
预分配内存 | PoolSize扩展 | PayloadSize扩展 | 释放删除 | |
---|---|---|---|---|
DataSharingPayloadPool | 是 | 否 | 否 | 否 |
PreallocatedReallocTopicPayloadPool | 是 | 否 | 是 | 否 |
PreallocatedTopicPayloadPool | 是 | 否 | 否 | 否 |
DynamicReusableTopicPayloadPool | 否 | 是 | 是 | 否 |
DynamicTopicPayloadPool | 否 | 是 | 是 | 是 |