Location>code7788 >text

iceoryx source code reading (V) - shared memory communication (III)

Popularity:662 ℃/2024-10-22 00:37:42

catalogs
  • 1 Normal message reception flow
    • 1.1 SubscriberImpl::take
    • 1.2 BaseSubscriber::takeChunk
    • 1.3 SubscriberPortUser::tryGetChunk
    • 1.4 ChunkReceiver::tryGet
    • 1.5 ChunkQueuePopper::tryPop
  • 2 Callback Function Receiving Mechanism
  • 3 Summary

Recently, a few of you have wanted me to continue writing the remainder of the ICEORYX Source Code Reader, so I'll continue writing it anyway, for your information.

This article mainly introduces the normal message receiving process, from the process itself, this part of the code logic is relatively simple. In order to enrich the content, we have done a more in-depth interpretation of the code, combined with the content of the previous articles, analyzed from the code point of view.

1 Normal message reception flow

The normal message receiving process is the process where the receiver calls the receive function to get the description of the message location from the shared message queue, and then reads the data from the shared memory based on this description. In the following, we iceoryx to the application layer to provide the function to receive messages to begin to introduce, step by step in-depth.

1.1 SubscriberImpl::take

Duties:
The application layer receives data through the SubscriberImpl::take function. Logically, this function gets the data through the more low-level methods of theChunkHeaderpointer (we have already introduced this data structure in iceoryx source code reading (II)), which in turn obtains the user load data, from which the structure used by the user to describe the message is obtained.

Template Parameters:

  • T: Message body type.
  • H: Message header type.
  • BaseSubscriberType: Base class types, don't understand why iceoryx often defines base classes as paradigms as well, paradigms are used too much.

Into the Senate:
not have

Return Value:

The return value is of typecxx::expected<Sample<const T, const H>, ChunkReceiveResult>, contains two parts:

  • The return value type for the normal case, i.e:Sample<const T, const H>This type is a wrapper class for user load data to make it easier to read the data in it.

  • The return value type for the error case, i.e., ChunkReceiveResult, is defined as follows:

enum class ChunkReceiveResult
{
    TOO_MANY_CHUNKS_HELD_IN_PARALLEL,
    NO_CHUNK_AVAILABLE
};

The cause of the error is stated.

Here is the code listing for this function:

template <typename T, typename H, typename BaseSubscriberType>
inline cxx::expected<Sample<const T, const H>, ChunkReceiveResult>
SubscriberImpl<T, H, BaseSubscriberType>::take() noexcept
{
    auto result = BaseSubscriberType::takeChunk();
    if (result.has_error())
    {
        return cxx::error<ChunkReceiveResult>(result.get_error());
    }
    auto userPayloadPtr = static_cast<const T*>(()->userPayload());
    auto samplePtr = iox::unique_ptr<const T>(userPayloadPtr, [this](const T* userPayload) {
        auto* chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload);
        this->port().releaseChunk(chunkHeader);
    });
    return cxx::success<Sample<const T, const H>>(std::move(samplePtr));
}

Segment-by-segment code analysis:

  • LINE 05 ~ LINE 09: Call the takeChunk method of the base class BaseSubscriberType, i.e.: BaseSubscriber, to get a pointer to the ChunkHeader, for the ChunkHeader, refer to theiceoryx source code reading (II) section 4

  • LINE 10 ~ LINE 15: Based on the pointer to the ChunkHeader, get the pointer to the user data and construct the application layer-friendlySample<const T, const H>instance and return.

1.2 BaseSubscriber<port_t>::takeChunk

Duties:

template classBaseSubscriber<port_t>which contains a file of typeport_tmembersm_portThis is the port data structure. iceoryx encapsulates the queue data structure in its ports, as described in theiceoryx source code reading (IV) section 1. For the subscriber class, theport_tin effectSubscriberPortUser

Into the Senate:

not have

Return Value: cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>

  • The normal return value type isconst mepoo::ChunkHeader*that is, pointing to theChunkHeaderThe pointer to the

  • The return value in case of error is of typeChunkReceiveResultThe reason for the acquisition failure is explained in the following way.

Overall code analysis:

template <typename port_t>
inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> BaseSubscriber<port_t>::takeChunk() noexcept
{
    return m_port.tryGetChunk();
}

This function just calls them_port(This is an enumeration type)port_tThe specialization type isSubscriberPortUser(used for emphasis)SubscriberPortUser::tryGetChunkMethods.

1.3 SubscriberPortUser::tryGetChunk

The implementation code for this function is as follows:

cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> SubscriberPortUser::tryGetChunk() noexcept
{
    return m_chunkReceiver.tryGet();
}

Again, this is a direct call to them_chunkReceiverIts type isChunkReceiver<SubscriberPortData::ChunkReceiverData_t>(used form a nominal expression)tryGetmethod, where the paradigm parameterSubscriberPortData::ChunkReceiverData_tBy tracing back toSubscriberPortData, located at iceoryx_posh/include/iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp:64, was found to beiox::popo::SubscriberChunkReceiverData_tType:

using ChunkReceiverData_t = iox::popo::SubscriberChunkReceiverData_t;

(indicates contrast)SubscriberChunkReceiverData_tAnother type alias, defined in iceoryx_posh/include/iceoryx_posh/internal/popo/ports/pub_sub_port_types.hpp::

using SubscriberChunkReceiverData_t =
    ChunkReceiverData<MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY, SubscriberChunkQueueData_t>;

ChunkReceiverDataclass is also a paradigm class whose base class is the second paradigm parameter, which in the above code is theSubscriberChunkQueueData_t, is a type alias and the actual type is as follows:

using SubscriberChunkQueueData_t = ChunkQueueData<DefaultChunkQueueConfig, ThreadSafePolicy>;

this suggests thatChunkReceiverDataParadigm classes inherit fromChunkQueueDataParadigm class, this class is the communication queue data organization that we have in theThe iceoryx Source Code Reader (IV) - Shared Memory Communication (II)Introduced.

ChunkReceiverDataThe only member defined in the class (excluding the base class)m_chunksInUseThe details are as follows:

static constexpr uint32_t MAX_CHUNKS_IN_USE = MaxChunksHeldSimultaneously + 1U;
UsedChunkList<MAX_CHUNKS_IN_USE> m_chunksInUse;

This member is used to cache incomingSharedChunk, for reasons that will be explained in the next subsection. It should be noted thatUsedChunkListAlthough the type nameList, but it is essentially an array, see the data members in iceoryx_posh/include/iceoryx_posh/internal/popo/used_chunk_list.hpp:79-82 for details:

uint32_t m_usedListHead{INVALID_INDEX};
uint32_t m_freeListHead{0u};
uint32_t m_listIndices[Capacity];
DataElement_t m_listData[Capacity];

Here's a brief analysisChunkReceiverthe inheritance structure of theChunkReceiver<SubscriberPortData::ChunkReceiverData_t>inherited fromChunkQueuePopper<typename ChunkReceiverDataType::ChunkQueueData_t>As the name suggests, this class fetches elements from the communication queue and is the first class to be used on theChunkQueueDataof encapsulation. The reason is as follows: here the paradigm typeChunkReceiverData_t, ie:ChunkReceiverDataA type alias is defined in the

using ChunkQueueData_t = ChunkQueueDataType;

Here.ChunkQueueDataTypeIt's actually the aboveChunkQueueDataThe message queue can be accessed through them_queue. Below, we presentChunkReceiver<ChunkReceiverDataType>::tryGetspecific implementations to further deepen the understanding of the above data structures.

1.4 ChunkReceiver::tryGet

ChunkReceiver<ChunkReceiverDataType>is the paradigm class and the paradigm parameter

Duties:

Calls a member method of the base class to take out an element of the receive queue, the function returns a type ofSharedChunkWe're in theThe iceoryx source code reading (II) - Shared Memory ManagementIt's already covered in the (which seems a bit roughly presented now :-).

Into the Senate:

not have

Return Value: cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>

  • The normal return value type isconst mepoo::ChunkHeader*that is, pointing to theChunkHeaderThe pointer to the

  • The return value in case of error is of typeChunkReceiveResultThe reason for the acquisition failure is explained in the following way.

Segment-by-segment code analysis:

template <typename ChunkReceiverDataType>
inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>
ChunkReceiver<ChunkReceiverDataType>::tryGet() noexcept
{
    auto popRet = this->tryPop();

    if (popRet.has_value())
    {
        auto sharedChunk = *popRet;

        // if the application holds too many chunks, don't provide more
        if (getMembers()->m_chunksInUse.insert(sharedChunk))
        {
            return cxx::success<const mepoo::ChunkHeader*>(
                const_cast<const mepoo::ChunkHeader*>(()));
        }
        else
        {
            // release the chunk
            sharedChunk = nullptr;
            return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::TOO_MANY_CHUNKS_HELD_IN_PARALLEL);
        }
    }
    return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::NO_CHUNK_AVAILABLE);
}

LINE 05 ~ LINE 05: Calling the base class (i.e.: the one introduced in the previous subsection)ChunkQueuePopper<typename ChunkReceiverDataType::ChunkQueueData_t>) member functiontryPopthrough (a gap)ChunkQueueDatapops a message element from the maintained message queue with a return value ofSharedChunkThe next subsection details the implementation of this function.

LINE 07 ~ LINE 23: Successfully fetch the message from the message queue and store it in the cache array for the purpose of (1) subsequent retrieval of the message according to theChunkHeader *pointer to get the correspondingSharedChunk; (2) If not stored, leaving this function, theSharedChunkThe instance is dereferenced, meaning that the shared memory is freed. One might ask why not just return theSharedChunkWhat about it? It's actually possible, but there still needs to be a place to cache theSharedChunkinstance until this shared memory is no longer needed. Caching it the first time you get it is a better way to do this.

LINE 24 ~ LINE 24:If the cache array is full, release theSharedChunki.e.: freed shared memory and returned an errorChunkReceiveResult::TOO_MANY_CHUNKS_HELD_IN_PARALLEL

1.5 ChunkQueuePopper::tryPop

Duties:
From the message queuem_queue(Stored in shared memory, see:iceoryx source code reading (IV) - shared memory communication (II)) gets the message description data and transforms it into theSharedChunkThe instance returns.

Into the Senate:

not have

Return Value: cxx::optional<mepoo::SharedChunk>

No additional information such as the cause of the error is needed here, so unlike the previous function, expect is not used as the return type.

  • The normal return value type ismepoo::SharedChunk

  • Returns a special value in case of errornullopt_t

Segment-by-segment code analysis:

template <typename ChunkQueueDataType>
inline cxx::optional<mepoo::SharedChunk> ChunkQueuePopper<ChunkQueueDataType>::tryPop() noexcept
{
    auto retVal = getMembers()->m_queue.pop();

    // check if queue had an element that was poped and return if so
    if (retVal.has_value())
    {
        auto chunk = ().releaseToSharedChunk();

        auto receivedChunkHeaderVersion = ()->chunkHeaderVersion();
        if (receivedChunkHeaderVersion != mepoo::ChunkHeader::CHUNK_HEADER_VERSION)
        {
            LogError() << "Received chunk with CHUNK_HEADER_VERSION '" << receivedChunkHeaderVersion
                       << "' but expected '" << mepoo::ChunkHeader::CHUNK_HEADER_VERSION << "'! Dropping chunk!";
            errorHandler(PoshError::POPO__CHUNK_QUEUE_POPPER_CHUNK_WITH_INCOMPATIBLE_CHUNK_HEADER_VERSION,
                         ErrorLevel::SEVERE);
            return cxx::nullopt_t();
        }
        return cxx::make_optional<mepoo::SharedChunk>(chunk);
    }
    else
    {
        return cxx::nullopt_t();
    }
}

LINE 04 ~ LINE 04: Message Queue from Shared Memorym_queueGettingShmSafeUnmanagedChunkElement (please refer toThe iceoryx Source Code Reader (III) - Shared Memory Communication (I))。

LINE 07 ~ LINE 21: Get Success

pass (a bill or inspection etc)ShmSafeUnmanagedChunkmember method of thereleaseToSharedChunkFind the corresponding shared memory region, and create theSharedChunkinstance and return. Note that LINE 11 to LINE 19 do version checking, and if it does not match the version on the receiving end, again the fetch is considered to have failed. The version number is an integer that identifies an incompatible modification. iceory has the following explanation:

/// @brief From the 1.0 release onward, this must be incremented for each incompatible change, .
///            - data width of members changes
///            - members are rearranged
///            - semantic meaning of a member changes
static constexpr uint8_t CHUNK_HEADER_VERSION{1U};

LINE 22 ~ LINE 25: Failed to get

Direct return of special valuescxx::nullopt_t()Here it implicitly calls theoptionalof the following constructor:

optional(const nullopt_t) noexcept;

2 Callback Function Receiving Mechanism

If the receiver starts first, the shared memory queue is empty and an error is returned.ChunkReceiveResult::NO_CHUNK_AVAILABLE, if the receiving side keeps polling, it will waste performance. Is it possible to implement asynchronous listening and processing of messages with a notification mechanism on the sending side? The answer is yes, and we will provide an in-depth introduction to the notification logic on the sending side and the waiting logic on the receiving side of the message in "iceoryx Source Code Reading (IX) - Waiting and Notification Mechanisms".

3 Summary

This article mainly introduces the normal message receiving process, for asynchronous callback message receiving process will be introduced in a subsequent article.