【FastDDS源码剖析】Payload Pool

发布时间:2024年01月15日

前言

在本文中,我将对Fast DDS中的Payload Pool源码进行分析,它是Fast DDS中一种重要的资源管理机制,用于高效地管理和重用数据的存储空间。

在实时通信中,数据的传输和处理效率对于系统的性能至关重要。Payload Pool为Fast DDS提供了一种优化策略,用于管理数据负载的内存分配和释放。通过预先分配一定数量的数据缓冲区,并将其存储在Payload Pool中,Fast DDS能够在数据传输过程中高效地重用这些缓冲区,避免频繁的内存分配和释放操作,从而显著提高系统的性能和效率。

本文将深入探讨Fast DDS中各种Payload Pool的实现细节和关键功能。我们将详细解释Payload Pool的工作原理,以帮助读者在实践中更好地利用和优化Payload Pool。

通过深入研究Fast DDS中Payload Pool的源码实现,读者将能够更好地理解Fast DDS内部的数据管理和资源优化机制,并了解如何在实时通信应用程序中合理配置和使用Payload Pool,以实现更高效、可靠的数据传输。

FastDDS提供的Payload Pool如下
PayloadPool

TopicPayloadPool

TopicPayloadPool中有两个free_payloads_和all_payloads_连个vector,里面存储的是PayloadNode*类型。下图为PayloadNode的结构:

PayloadNode
一般我们需要一个不定长buffer的时候可能会在NodeInfo中加一个char*字段,然后创建NodeInfo后再给buffer申请一块内存,这样需要申请两次。这里的实现是在NodeInfo的最后加一个一字节数组,申请内存时直接申请buffer size + offsetof(NodeInfo, data),一次申请完成,有利于减少内存碎片。

所有创建出来的PayloadNode指针都会存到all_payloads_中,NodeInfo中的data_index就是它在all_payloads_中的位置。data_size就是实际data的大小,ref_counter是引用计数。free_payloads_中存储的就是创建出来,但是还没有使用的PayloadNode,分配出去之后会从free_payloads_中删除。

TopicPayloadPool有几个重要的方法,下面我们来一一介绍

allocate

参数size指的是PayloadNode中data的大小。如果内存池超过上限则返回空指针,否则调用do_allocate()创建PayloadNode加入到all_payloads_PayloadNode的构造函数会创建NodeInfo用来存储数据。

TopicPayloadPool::PayloadNode* TopicPayloadPool::allocate(
        uint32_t size)
{
    if (all_payloads_.size() >= max_pool_size_)
    {
        logWarning(RTPS_HISTORY, "Maximum number of allowed reserved payloads reached");
        return nullptr;
    }

    return do_allocate(size);
}

TopicPayloadPool::PayloadNode* TopicPayloadPool::do_allocate(
        uint32_t size)
{
    PayloadNode* payload = new (std::nothrow) PayloadNode(size);

    if (payload != nullptr)
    {
        payload->data_index(static_cast<uint32_t>(all_payloads_.size()));
        all_payloads_.push_back(payload);
    }
    else
    {
        logWarning(RTPS_HISTORY, "Failure to create a new payload ");
    }

    return payload;
}

do_get_payload

如果free_payloads_为空,则调用allocate()新分配一块,否则就将free_payloads_中最后一个拿出来,如果是可以resize的并且size变大了,那么重新分配内存到size大小。注意这里重新分配内存使用的是realloc


bool TopicPayloadPool::do_get_payload(
        uint32_t size,
        CacheChange_t& cache_change,
        bool resizeable)
{
    PayloadNode* payload = nullptr;

    std::unique_lock<std::mutex> lock(mutex_);
    if (free_payloads_.empty())
    {
        payload = allocate(size); //Allocates a single payload
        if (payload == nullptr)
        {
            lock.unlock();
            cache_change.serializedPayload.data = nullptr;
            cache_change.serializedPayload.max_size = 0;
            cache_change.payload_owner(nullptr);
            return false;
        }
    }
    else
    {
        payload = free_payloads_.back();
        free_payloads_.pop_back();
    }

    // Resize if needed
    if (resizeable && size > payload->data_size())
    {
        if (!payload->resize(size))
        {
            // Failed to resize, but we can still keep it for later.
            free_payloads_.push_back(payload);
            lock.unlock();
            logError(RTPS_HISTORY, "Failed to resize the payload");

            cache_change.serializedPayload.data = nullptr;
            cache_change.serializedPayload.max_size = 0;
            cache_change.payload_owner(nullptr);
            return false;
        }
    }

    lock.unlock();
    payload->reference();
    cache_change.serializedPayload.data = payload->data();
    cache_change.serializedPayload.max_size = payload->data_size();
    cache_change.payload_owner(this);

    return true;
}

get_payload

get_payload(SerializedPayload_t& data, IPayloadPool*& data_owner, CacheChange_t& cache_change)

  • data_owner非空
    • data_owner是当前Pool
      引用计数加1
    • data_owner非当前Pool
      从当前PayloadPool获取一个payload,将传入的data copy到新的payload中
  • data_owner为空
    从当前PayloadPool获取一个payload,同时要返回给data和cache_change,并且将data_owner置为this
bool TopicPayloadPool::get_payload(
        uint32_t size,
        CacheChange_t& cache_change)
{
    return do_get_payload(size, cache_change, false);
}


bool TopicPayloadPool::get_payload(
        SerializedPayload_t& data,
        IPayloadPool*& data_owner,
        CacheChange_t& cache_change)
{
    assert(cache_change.writerGUID != GUID_t::unknown());
    assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());

    if (data_owner == this)
    {
        PayloadNode::reference(data.data);

        cache_change.serializedPayload.data = data.data;
        cache_change.serializedPayload.length = data.length;
        cache_change.serializedPayload.max_size = PayloadNode::data_size(data.data);
        cache_change.payload_owner(this);
        return true;
    }
    else
    {
        if (get_payload(data.length, cache_change))
        {
            if (!cache_change.serializedPayload.copy(&data, true))
            {
                release_payload(cache_change);
                return false;
            }

            if (data_owner == nullptr)
            {
                data_owner = this;
                data.data = cache_change.serializedPayload.data;
                PayloadNode::reference(data.data);
            }

            return true;
        }
    }

    return false;
}

reserve

直接将内存池size扩充到min_num_payloads,因为是新建,所以是free的,除了存到all_payloads_中,同时也会存到free_payloads_中。

void TopicPayloadPool::reserve (
        uint32_t min_num_payloads,
        uint32_t size)
{
    assert (min_num_payloads <= max_pool_size_);

    for (size_t i = all_payloads_.size(); i < min_num_payloads; ++i)
    {
        PayloadNode* payload = do_allocate(size);

        if (payload != nullptr)
        {
            free_payloads_.push_back(payload);
        }
    }
}

reserve_history()会加大内存池大小的最大值,release_history()会减小最大值。reserve_history()不会增加内存池中实际的PayloadNode的数量,但是release_history()会调用shrink (),如果最大值减少到了比当前的all_payloads_.size()还小,则会将多余的free_payloads_中的PayloadNode删除,同时也将all_payloads_中相同的PayloadNode删除。

shrink

为了减少内存的频繁申请及碎片化,这里payload的列表并没有使用list,但是vector的删除操作会将删除位置后面的所有数据往前移,所以这里应该避免这样的事情发生。free_payloads_是从后往前删的,不会出现这个问题,但是free_payloads_的最后一个元素,在all_payloads_中基本上是存储到中间位置的,如果直接从all_payloads_中删除就会出现上面的问题,所以这里是将最后一个元素copy到这里,然后再将最后一个元素删除。

bool TopicPayloadPool::shrink (
        uint32_t max_num_payloads)
{
    assert(payload_pool_allocated_size() - payload_pool_available_size() <= max_num_payloads);

    while (max_num_payloads < all_payloads_.size())
    {
        PayloadNode* payload = free_payloads_.back();
        free_payloads_.pop_back();

        // Find data in allPayloads, remove element, then delete it
        all_payloads_.at(payload->data_index()) = all_payloads_.back();
        all_payloads_.back()->data_index(payload->data_index());
        all_payloads_.pop_back();
        delete payload;
    }

    return true;
}

DynamicTopicPayloadPool

get_payload

注意do_get_payload()的第三个参数resizeable为true,意味着这个pool可以获取任意大小的payload。

 bool get_payload(
            uint32_t size,
            CacheChange_t& cache_change) override
    {
        return (size > 0u) && do_get_payload(size, cache_change, true);
    }

release_payload

引用计数-1之后如果等于1了,也就是说没有其他人在用这个payload的时候,从all_payloads_中删除。这意味着payload并不是一直在pool中,也会动态创建、销毁,但是可以允许多人共享。

bool release_payload(
            CacheChange_t& cache_change) override
    {
        assert(cache_change.payload_owner() == this);

        {
            if (PayloadNode::dereference(cache_change.serializedPayload.data))
            {
                //First remove it from all_payloads
                std::unique_lock<std::mutex> lock(mutex_);
                uint32_t data_index = PayloadNode::data_index(cache_change.serializedPayload.data);
                PayloadNode* payload = all_payloads_.at(data_index);
                all_payloads_.at(data_index) = all_payloads_.back();
                all_payloads_.back()->data_index(data_index);
                all_payloads_.pop_back();
                lock.unlock();

                // Now delete the data
                delete(payload);
            }
        }

        cache_change.serializedPayload.length = 0;
        cache_change.serializedPayload.pos = 0;
        cache_change.serializedPayload.max_size = 0;
        cache_change.serializedPayload.data = nullptr;
        cache_change.payload_owner(nullptr);

        return true;
    }
    

release_history

与父类的区别是,update_maximum_size()后没有调用shrink(),不会缩减pool的实际大小。

    bool release_history(
            const PoolConfig& config,
            bool /*is_reader*/) override
    {
        assert(config.memory_policy == memory_policy());

        std::lock_guard<std::mutex> lock(mutex_);
        update_maximum_size(config, false);

        return true;
    }

DynamicReusableTopicPayloadPool

get_payload

与DynamicTopicPayloadPool一致,size大于0才会申请。

    bool get_payload(
            uint32_t size,
            CacheChange_t& cache_change) override
    {
        return (size > 0u) && do_get_payload(size, cache_change, true);
    }

没有重写其他函数,那么release_payload的行为与父类一直,会还回free_payloads里,这就是Reusable。

PreallocatedTopicPayloadPool

构造时传入payload_size,每个payload大小一致

explicit PreallocatedTopicPayloadPool(
            uint32_t payload_size)
        : payload_size_(payload_size)
        , minimum_pool_size_(0)
    {
        assert(payload_size_ > 0);
    }

get_payload

这里第三个三叔传的false,就是不可以resize

 bool get_payload(
            uint32_t /* size */,
            CacheChange_t& cache_change) override
    {
        return do_get_payload(payload_size_, cache_change, false);
    }

reserve_history()

调用reserve()直接创建所有的payload node

     bool reserve_history(
            const PoolConfig& config,
            bool is_reader) override
    {
        if (!TopicPayloadPool::reserve_history(config, is_reader))
        {
            return false;
        }

        std::lock_guard<std::mutex> lock(mutex_);
        minimum_pool_size_ += config.initial_size;
        reserve(minimum_pool_size_, payload_size_);
        return true;
    }

release_history

调用父类release_history,不会真正的释放内存

    bool release_history(
            const PoolConfig& config,
            bool is_reader) override
    {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            minimum_pool_size_ -= config.initial_size;
        }

        return TopicPayloadPool::release_history(config, is_reader);
    }

PreallocatedReallocTopicPayloadPool

与PreallocatedTopicPayloadPool类似,构造中会先初始化payload的最小size。类名里的Realloc意思是payload的大小可以扩大,体现在get_payload()中。

    explicit PreallocatedReallocTopicPayloadPool(
            uint32_t payload_size)
        : min_payload_size_(payload_size)
        , minimum_pool_size_(0)
    {
        assert(min_payload_size_ > 0);
    }

get_payload

选择了传入的size,和初始化时的min_payload_size_中的最大值,并且调用do_get_payload()时resizeable参数为true。可以扩展Payload的大小

     bool get_payload(
            uint32_t size,
            CacheChange_t& cache_change) override
    {
        return do_get_payload(std::max(size, min_payload_size_), cache_change, true);
    }

reserve_history

调用reserve()预先分配出minimum_pool_size_个payload node

    bool reserve_history(
            const PoolConfig& config,
            bool is_reader) override
    {
        if (!TopicPayloadPool::reserve_history(config, is_reader))
        {
            return false;
        }

        std::lock_guard<std::mutex> lock(mutex_);
        minimum_pool_size_ += config.initial_size;
        reserve(minimum_pool_size_, min_payload_size_);
        return true;
    }

release_history

除了维护下自己的minimum_pool_size_外,直接调用TopicPayloadPool::release_history,并不会真正释放内存


    bool release_history(
            const PoolConfig& config,
            bool is_reader) override
    {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            minimum_pool_size_ -= config.initial_size;
        }

        return TopicPayloadPool::release_history(config, is_reader);
    }

BaseImpl

这个pool看起来很简单,似乎不是一个内存池。这里的内存复用实际体现在CacheChange_t的SerializedPayload_t上,它有两个成员 uint32_t length; octet* data;。复用就是通过申请时使用realloc,释放时将length设置为0,不delete data来实现。默认的实现就是复用data内存。


class BaseImpl : public IPayloadPool
{
    bool get_payload(
            uint32_t size,
            CacheChange_t& cache_change) override
    {
        cache_change.serializedPayload.reserve(size);
        cache_change.payload_owner(this);
        return true;
    }

    bool get_payload(
            SerializedPayload_t& data,
            IPayloadPool*& /* data_owner */,
            CacheChange_t& cache_change) override
    {
        assert(cache_change.writerGUID != GUID_t::unknown());
        assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());

        if (cache_change.serializedPayload.copy(&data, false))
        {
            cache_change.payload_owner(this);
            return true;
        }
        return false;
    }

    bool release_payload(
            CacheChange_t& cache_change) override
    {
        assert(cache_change.payload_owner() == this);

        cache_change.serializedPayload.length = 0;
        cache_change.serializedPayload.pos = 0;
        cache_change.payload_owner(nullptr);

        return true;
    }

};

template <MemoryManagementPolicy_t policy_>
class Impl : public BaseImpl
{
};

Impl<DYNAMIC_REUSABLE_MEMORY_MODE>

template <>
class Impl<DYNAMIC_REUSABLE_MEMORY_MODE> : public BaseImpl
{
};

Impl<DYNAMIC_RESERVE_MEMORY_MODE>

release_payload(): 调用了cache_change.serializedPayload.empty() 释放了内存。

template <>
class Impl<DYNAMIC_RESERVE_MEMORY_MODE> : public BaseImpl
{
public:

    bool release_payload(
            CacheChange_t& cache_change) override
    {
        assert(cache_change.payload_owner() == this);

        cache_change.serializedPayload.empty();
        cache_change.payload_owner(nullptr);

        return true;
    }

};

Impl<PREALLOCATED_MEMORY_MODE>

预分配模式构造时固定payload_size_,get_payload()时不使用传入的size,而是使用固定的payload_size_。


template <>
class Impl<PREALLOCATED_MEMORY_MODE> : public BaseImpl
{
public:

    explicit Impl(
            uint32_t payload_size)
        : payload_size_(payload_size)
    {
        assert(payload_size_ > 0);
    }

    bool get_payload(
            uint32_t /* size */,
            CacheChange_t& cache_change) override
    {
        cache_change.serializedPayload.reserve(payload_size_);
        cache_change.payload_owner(this);
        return true;
    }

    bool get_payload(
            SerializedPayload_t& data,
            IPayloadPool*& /* data_owner */,
            CacheChange_t& cache_change) override
    {
        assert(cache_change.writerGUID != GUID_t::unknown());
        assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());

        cache_change.serializedPayload.reserve(payload_size_);
        if (cache_change.serializedPayload.copy(&data, true))
        {
            cache_change.payload_owner(this);
            return true;
        }

        return false;
    }

private:

    uint32_t payload_size_;
};

Impl<PREALLOCATED_WITH_REALLOC_MEMORY_MODE>

与Impl<PREALLOCATED_MEMORY_MODE>类似,with_realloc体现在cache_change.serializedPayload.reserve(std::max(size, min_payload_size_));

template <>
class Impl<PREALLOCATED_WITH_REALLOC_MEMORY_MODE> : public BaseImpl
{
public:

    explicit Impl(
            uint32_t payload_size)
        : min_payload_size_(payload_size)
    {
        assert(min_payload_size_ > 0);
    }

    bool get_payload(
            uint32_t size,
            CacheChange_t& cache_change) override
    {
        cache_change.serializedPayload.reserve(std::max(size, min_payload_size_));
        cache_change.payload_owner(this);
        return true;
    }

private:

    uint32_t min_payload_size_;
};

DataSharingPayloadPool

data sharing pool会申请一整块共享内存,将这块内存分为三个部分,第一部分就是存用户数据的地方,第二部分存储PayloadNode的在segment中的offset,第三部分存的是PoolDescriptor。
在这里插入图片描述

PayloadNode

用于存储用户数据,共享内存中会存在多个连续的PayloadNode,这里是8字节对齐,为了减少访问内存次数(有需要可以研究下内存对齐)。

 class alignas (8) PayloadNode{...}

里面有一些元数据,在元数据的尾巴上跟着用户数据,跟前面提到的一个一字节数组类似,区别是这里没有了数组变量名,需要使用PayloadNode的指针+data_offset访问。
DataSharingPayloadPool

Segment::Offset

存储一个PayloadNode的在共享内存的位置。

using Offset = std::uint32_t;

PoolDescriptor

    struct alignas (8) PoolDescriptor
    {
        uint32_t history_size;          //< Number of payloads in the history
        uint64_t notified_begin;        //< The index of the oldest history entry already notified (ready to read)
        uint64_t notified_end;          //< The index of the history entry that will be notified next
        uint32_t liveliness_sequence;   //< The ID of the last liveliness assertion sent by the writer
    };

WriterPool

WriterPool中有一个成员FixedSizeQueue<PayloadNode*> free_payloads_用于存储未使用的PayloadNode指针,底层实现类似环形队列。

get_payload

与TopicPayloadPool基本相同,唯一区别是因为使用的共享内存,不可以resize,所以free_payloads_耗光之后就无法再获取到payload。


    bool get_payload(
            uint32_t /*size*/,
            CacheChange_t& cache_change) override
    {
        if (free_payloads_.empty())
        {
            return false;
        }

        PayloadNode* payload = free_payloads_.front();
        free_payloads_.pop_front();
        // Reset all the metadata to signal the reader that the payload is dirty
        payload->reset();

        cache_change.serializedPayload.data = payload->data();
        cache_change.serializedPayload.max_size = max_data_size_;
        cache_change.payload_owner(this);

        return true;
    }

    bool get_payload(
            SerializedPayload_t& data,
            IPayloadPool*& data_owner,
            CacheChange_t& cache_change) override
    {
        assert(cache_change.writerGUID != GUID_t::unknown());
        assert(cache_change.sequenceNumber != SequenceNumber_t::unknown());

        if (data_owner == this)
        {
            cache_change.serializedPayload.data = data.data;
            cache_change.serializedPayload.length = data.length;
            cache_change.serializedPayload.max_size = data.length;
            cache_change.payload_owner(this);
            return true;
        }
        else
        {
            if (get_payload(data.length, cache_change))
            {
                if (!cache_change.serializedPayload.copy(&data, true))
                {
                    release_payload(cache_change);
                    return false;
                }

                if (data_owner == nullptr)
                {
                    data_owner = this;
                    data.data = cache_change.serializedPayload.data;
                }

                return true;
            }
        }

        return false;
    }

    

release_payload

release时,如果payload已经从history中删除了,那么把PoolDescriptor中的notified_begin 更新到首个没有删除的位置。并将这些remove的payload还回free_payloads_,否则直接还给free_payloads_

bool release_payload(
            CacheChange_t& cache_change) override
    {
        assert(cache_change.payload_owner() == this);

        // Payloads are reset on the `get` operation, the `release` leaves the data to give more chances to the reader
        PayloadNode* payload = PayloadNode::get_from_data(cache_change.serializedPayload.data);
        if (payload->has_been_removed())
        {
            advance_till_first_non_removed();
        }
        else
        {
            free_payloads_.push_back(payload);
        }
        logInfo(DATASHARING_PAYLOADPOOL, "Change released with SN " << cache_change.sequenceNumber);

        return DataSharingPayloadPool::release_payload(cache_change);
    }

init_shared_segment

共享内存的创建就在这里,segment_id_其实就是writer的Guidsegment_name_为 “fast_datasharing_” + writer_guid.guidPrefix + "" +writer_guid.entityId,默认/dev/shm下会创建一个segment_name 为文件名的文件。如果申请的共享内存超过4G则会overflow失败。


    template <typename T>
    bool init_shared_segment(
            const RTPSWriter* writer,
            const std::string& shared_dir)
    {
        writer_ = writer;
        segment_id_ = writer_->getGuid();
        segment_name_ = generate_segment_name(shared_dir, segment_id_);
        std::unique_ptr<T> local_segment;
        size_t payload_size;
        uint64_t estimated_size_for_payloads_pool;
        uint64_t estimated_size_for_history;
        uint32_t size_for_payloads_pool;

        try
        {
            // We need to reserve the whole segment at once, and the underlying classes use uint32_t as size type.
            // In order to avoid overflows, we will calculate using uint64 and check the casting
            bool overflow = false;
            size_t per_allocation_extra_size = T::compute_per_allocation_extra_size(
                alignof(PayloadNode), DataSharingPayloadPool::domain_name());
            payload_size = DataSharingPayloadPool::node_size(max_data_size_);

            estimated_size_for_payloads_pool = pool_size_ * payload_size;
            overflow |= (estimated_size_for_payloads_pool != static_cast<uint32_t>(estimated_size_for_payloads_pool));
            size_for_payloads_pool = static_cast<uint32_t>(estimated_size_for_payloads_pool);

            //Reserve one extra to avoid pointer overlapping
            estimated_size_for_history = (pool_size_ + 1) * sizeof(Segment::Offset);
            overflow |= (estimated_size_for_history != static_cast<uint32_t>(estimated_size_for_history));
            uint32_t size_for_history = static_cast<uint32_t>(estimated_size_for_history);

            uint32_t descriptor_size = static_cast<uint32_t>(sizeof(PoolDescriptor));
            uint64_t estimated_segment_size = size_for_payloads_pool + per_allocation_extra_size +
                    size_for_history + per_allocation_extra_size +
                    descriptor_size + per_allocation_extra_size;
            overflow |= (estimated_segment_size != static_cast<uint32_t>(estimated_segment_size));
            uint32_t segment_size = static_cast<uint32_t>(estimated_segment_size);

            if (overflow)
            {
                logError(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_
                                                                              << ": Segment size is too large: " << estimated_size_for_payloads_pool
                                                                              << " (max is " << std::numeric_limits<uint32_t>::max() << ")."
                                                                              << " Please reduce the maximum size of the history");
                return false;
            }

            //Open the segment
            T::remove(segment_name_);

            local_segment.reset(
                new T(boost::interprocess::create_only,
                segment_name_,
                segment_size + T::EXTRA_SEGMENT_SIZE));
        }
        catch (const std::exception& e)
        {
            logError(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_
                                                                          << ": " << e.what());
            return false;
        }

        try
        {
            // Alloc the memory for the pool
            // Cannot use 'construct' because we need to reserve extra space for the data,
            // which is not considered in sizeof(PayloadNode).
            payloads_pool_ = static_cast<octet*>(local_segment->get().allocate(size_for_payloads_pool));

            // Initialize each node in the pool
            free_payloads_.init(pool_size_);
            octet* payload = payloads_pool_;
            for (uint32_t i = 0; i < pool_size_; ++i)
            {
                new (payload) PayloadNode();

                // All payloads are free
                free_payloads_.push_back(reinterpret_cast<PayloadNode*>(payload));

                payload += (ptrdiff_t)payload_size;
            }

            //Alloc the memory for the history
            history_ = local_segment->get().template construct<Segment::Offset>(history_chunk_name())[pool_size_ + 1]();

            //Alloc the memory for the descriptor
            descriptor_ = local_segment->get().template construct<PoolDescriptor>(descriptor_chunk_name())();

            // Initialize the data in the descriptor
            descriptor_->history_size = pool_size_ + 1;
            descriptor_->notified_begin = 0u;
            descriptor_->notified_end = 0u;
            descriptor_->liveliness_sequence = 0u;

            free_history_size_ = pool_size_;
        }
        catch (std::exception& e)
        {
            T::remove(segment_name_);

            logError(DATASHARING_PAYLOADPOOL, "Failed to initialize segment " << segment_name_
                                                                              << ": " << e.what());
            return false;
        }

        segment_ = std::move(local_segment);
        is_initialized_ = true;
        return true;
    }

add_to_shared_history

加入后,history_会更新准确的PayloadNode位置。


/**
 * Fills the metadata of the shared payload from the cache change information
 * and adds the payload's offset to the shared history
 */
void add_to_shared_history(
            const CacheChange_t* cache_change)
    {
        assert(cache_change);
        assert(cache_change->serializedPayload.data);
        assert(cache_change->payload_owner() == this);
        assert(free_history_size_ > 0);

        // Fill the payload metadata with the change info
        PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data);
        node->status(ALIVE);
        node->data_length(cache_change->serializedPayload.length);
        node->source_timestamp(cache_change->sourceTimestamp);
        node->writer_GUID(cache_change->writerGUID);
        node->instance_handle(cache_change->instanceHandle);
        if (cache_change->write_params.related_sample_identity() != SampleIdentity::unknown())
        {
            node->related_sample_identity(cache_change->write_params.related_sample_identity());
        }

        // Set the sequence number last, it signals the data is ready
        node->sequence_number(cache_change->sequenceNumber);

        // Add it to the history
        history_[static_cast<uint32_t>(descriptor_->notified_end)] = segment_->get_offset_from_address(node);
        logInfo(DATASHARING_PAYLOADPOOL, "Change added to shared history"
                << " with SN " << cache_change->sequenceNumber);
        advance(descriptor_->notified_end);
        --free_history_size_;
    }

remove_from_shared_history

仅仅是标记removed


    /**
     * Removes the payload's offset from the shared history
     *
     * Payloads don't need to be removed from the history in the same order
     * they where added, but a payload will not be available through @ref get_payload until all
     * payloads preceding it have been removed from the shared history.
     */
    void remove_from_shared_history(
            const CacheChange_t* cache_change)
    {
        assert(cache_change);
        assert(cache_change->serializedPayload.data);
        assert(cache_change->payload_owner() == this);
        assert(descriptor_->notified_end != descriptor_->notified_begin);
        assert(free_history_size_ < descriptor_->history_size);

        logInfo(DATASHARING_PAYLOADPOOL, "Change removed from shared history"
                << " with SN " << cache_change->sequenceNumber);

        PayloadNode* payload = PayloadNode::get_from_data(cache_change->serializedPayload.data);
        payload->has_been_removed(true);
    }

advance_till_first_non_removed

将removed payload还回free_payloads_并且更新notified_begin


    void advance_till_first_non_removed()
    {
        while (descriptor_->notified_begin != descriptor_->notified_end)
        {
            auto offset = history_[static_cast<uint32_t>(descriptor_->notified_begin)];
            auto payload = static_cast<PayloadNode*>(segment_->get_address_from_offset(offset));
            if (!payload->has_been_removed())
            {
                break;
            }

            payload->has_been_removed(false);
            free_payloads_.push_back(payload);
            advance(descriptor_->notified_begin);
            ++free_history_size_;
        }
    }

ReaderPool

get_payload

只有WriterPool可以返回新的payloads。data_owner == this时,直接将data赋值给cache_change,否则一定是进程内的writer,进程内调用时同步的,所以不需要考虑覆盖写问题,直接read_from_shared_history()


    bool get_payload(
            uint32_t /*size*/,
            CacheChange_t& /*cache_change*/) override
    {
        // Only WriterPool can return new payloads
        return false;
    }

    bool get_payload(
            SerializedPayload_t& data,
            IPayloadPool*& data_owner,
            CacheChange_t& cache_change) override
    {
        if (data_owner == this)
        {
            cache_change.serializedPayload.data = data.data;
            cache_change.serializedPayload.length = data.length;
            cache_change.serializedPayload.max_size = data.length;
            cache_change.payload_owner(this);
            return true;
        }

        // If owner is not this, then it must be an intraprocess datasharing writer
        assert(nullptr != dynamic_cast<DataSharingPayloadPool*>(data_owner));
        PayloadNode* payload = PayloadNode::get_from_data(data.data);

        // No need to check validity, on intraprocess there is no override of payloads
        read_from_shared_history(cache_change, payload);
        return true;
    }

read_from_shared_history

读取前先记录sequence_number,读取完成后校验是否有被更改,因为是共享内存,写端有可能在我们读的过程中重新写数据。


    bool read_from_shared_history(
            CacheChange_t& cache_change,
            PayloadNode* payload)
    {
        // The sequence number can be unknown already, but we defer the check to the end
        cache_change.sequenceNumber = payload->sequence_number();

        cache_change.serializedPayload.data = payload->data();
        cache_change.serializedPayload.max_size = payload->data_length();
        cache_change.serializedPayload.length = payload->data_length();

        cache_change.kind = static_cast<ChangeKind_t>(payload->status());
        cache_change.writerGUID = payload->writer_GUID();
        cache_change.instanceHandle = payload->instance_handle();
        cache_change.sourceTimestamp = payload->source_timestamp();
        cache_change.write_params.sample_identity(payload->related_sample_identity());

        SequenceNumber_t check = payload->sequence_number();
        if (check == c_SequenceNumber_Unknown || check != cache_change.sequenceNumber)
        {
            // data override while processing
            return false;
        }

        cache_change.payload_owner(this);
        return true;
    }

release_payload

直接release

    bool release_payload(
            CacheChange_t& cache_change) override
    {
        assert(cache_change.payload_owner() == this);

        return DataSharingPayloadPool::release_payload(cache_change);
    }

init_shared_segment

这里用的segment_id_segment_name_与WriterPool一致,但是创建Segment的时候用的是boost::interprocess::open_read_only,这样就可以与WriterPool使用同一块共享内存。


    template <typename T>
    bool init_shared_segment(
            const GUID_t& writer_guid,
            const std::string& shared_dir)
    {
        segment_id_ = writer_guid;
        segment_name_ = generate_segment_name(shared_dir, writer_guid);

        std::unique_ptr<T> local_segment;
        // Open the segment
        try
        {
            local_segment = std::unique_ptr<T>(
                new T(boost::interprocess::open_read_only,
                segment_name_.c_str()));
        }
        catch (const std::exception& e)
        {
            logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open segment " << segment_name_
                                                                                << ": " << e.what());
            return false;
        }

        // Get the pool description
        descriptor_ = local_segment->get().template find<PoolDescriptor>(descriptor_chunk_name()).first;
        if (!descriptor_)
        {
            local_segment.reset();

            logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload pool descriptor " << segment_name_);
            return false;
        }

        // Get the history
        history_ = local_segment->get().template find<Segment::Offset>(history_chunk_name()).first;
        if (!history_)
        {
            local_segment.reset();

            logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload history " << segment_name_);
            return false;
        }

        // Set the reading pointer
        next_payload_ = begin();
        segment_ = std::move(local_segment);
        if (is_volatile_)
        {
            CacheChange_t ch;
            SequenceNumber_t last_sequence = c_SequenceNumber_Unknown;
            get_next_unread_payload(ch, last_sequence);
            while (ch.sequenceNumber != SequenceNumber_t::unknown())
            {
                advance(next_payload_);
                get_next_unread_payload(ch, last_sequence);
            }
            assert(next_payload_ == end());
        }

        return true;
    }

get_next_unread_payload

首先通过ensure_reading_reference_is_in_bounds()找到一个可以读到数据的位置,然后read_from_shared_history()读取数据。read_from_shared_history()过程中如果被复写,会返回失败,失败后会尝试从下一个位置读数据。


    void get_next_unread_payload(
            CacheChange_t& cache_change,
            SequenceNumber_t& last_sequence_number,
            uint64_t until)
    {
        last_sequence_number = last_sn_;

        while (next_payload_ < until)
        {
            // First ensure we are not too far behind
            // This may move the next_payload_ past the until value
            if (!ensure_reading_reference_is_in_bounds() && next_payload_ >= until)
            {
                break;
            }

            // history_[next_payload_] contains the offset to the payload
            PayloadNode* payload = static_cast<PayloadNode*>(
                segment_->get_address_from_offset(history_[static_cast<uint32_t>(next_payload_)]));
            if (!read_from_shared_history(cache_change, payload))
            {
                // Overriden while retrieving. Discard and continue
                advance(next_payload_);
                logWarning(RTPS_READER, "Dirty data detected on datasharing writer " << writer());
                continue;
            }

            if (last_sn_ != c_SequenceNumber_Unknown && last_sn_ >= cache_change.sequenceNumber)
            {
                // Sequence number went backwards, it was most probably overriden.
                continue;
            }

            if (!ensure_reading_reference_is_in_bounds())
            {
                // We may have been taken over and read a payload that is too far forward. Discard and continue
                continue;
            }

            last_sn_ = cache_change.sequenceNumber;

            return;
        }

        // Reset the data (may cause errors later on)
        cache_change.sequenceNumber = c_SequenceNumber_Unknown;
        cache_change.serializedPayload.data = nullptr;
        cache_change.payload_owner(nullptr);
    }

ensure_reading_reference_is_in_bounds

notified_begin低四字节为history_的索引,每次有新的通知过来+1,超过pool_size_后进位到高四字节。
在这里插入图片描述
我们看上面这种情况,reader刚好还可以读取到一个没有被覆盖的值,马上writer第10圈要追上reader了,扣圈就会覆盖数据,如果这个时候reader high =9, low = 4 就无法读取到想要读的值,已经被wrier复写了。如果reader high = 8, low = 5 ,说明已经落后太远,更是读不到。后一种情况就是就是代码中判断的next_payload_high + 1 < notified_end_high,前一种情况就是next_payload_high < notified_end_high && static_cast<uint32_t>(next_payload_) <= static_cast<uint32_t>(notified_end)。此时要更新next_payload_到一个当前能读到的最旧数据的位置。reader high =9, low = 4或者reader high = 8, low = 5时都会更新到上图的reader high = 9, low = 5的位置。

bool ensure_reading_reference_is_in_bounds()
    {
        auto notified_end = end();
        auto notified_end_high = notified_end >> 32;
        auto next_payload_high = next_payload_ >> 32;
        if (next_payload_high + 1 < notified_end_high ||
                (next_payload_high < notified_end_high &&
                static_cast<uint32_t>(next_payload_) <= static_cast<uint32_t>(notified_end)))
        {
            logWarning(RTPS_READER, "Writer " << writer() << " overtook reader in datasharing pool."
                                              << " Some changes will be missing.");

            // lower part is the index, upper part is the loop counter
            next_payload_ = ((notified_end_high - 1) << 32) + static_cast<uint32_t>(notified_end);
            advance(next_payload_);
            return false;
        }
        return true;
    }

总结

以上就是fastdds中内存池的实现,下表中总结了不同的内存池是否会预分配内存、get_payload时如果不存在 free payload,是否会扩展PoolSize、payload size大小不够是否是realloc、以及release_payload时时候会真的释放内存。

预分配内存PoolSize扩展PayloadSize扩展释放删除
DataSharingPayloadPool
PreallocatedReallocTopicPayloadPool
PreallocatedTopicPayloadPool
DynamicReusableTopicPayloadPool
DynamicTopicPayloadPool
文章来源:https://blog.csdn.net/nuaaty/article/details/135565451
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。