-
1 Normal message reception flow
- 1.1 SubscriberImpl::take
- 1.2 BaseSubscriber
::takeChunk - 1.3 SubscriberPortUser::tryGetChunk
- 1.4 ChunkReceiver::tryGet
- 1.5 ChunkQueuePopper::tryPop
- 2 Callback Function Receiving Mechanism
- 3 Summary
Recently, a few of you have wanted me to continue writing the remainder of the ICEORYX Source Code Reader, so I'll continue writing it anyway, for your information.
This article mainly introduces the normal message receiving process, from the process itself, this part of the code logic is relatively simple. In order to enrich the content, we have done a more in-depth interpretation of the code, combined with the content of the previous articles, analyzed from the code point of view.
1 Normal message reception flow
The normal message receiving process is the process where the receiver calls the receive function to get the description of the message location from the shared message queue, and then reads the data from the shared memory based on this description. In the following, we iceoryx to the application layer to provide the function to receive messages to begin to introduce, step by step in-depth.
1.1 SubscriberImpl::take
Duties:
The application layer receives data through the SubscriberImpl::take function. Logically, this function gets the data through the more low-level methods of theChunkHeader
pointer (we have already introduced this data structure in iceoryx source code reading (II)), which in turn obtains the user load data, from which the structure used by the user to describe the message is obtained.
Template Parameters:
- T: Message body type.
- H: Message header type.
- BaseSubscriberType: Base class types, don't understand why iceoryx often defines base classes as paradigms as well, paradigms are used too much.
Into the Senate:
not have
Return Value:
The return value is of typecxx::expected<Sample<const T, const H>, ChunkReceiveResult>
, contains two parts:
-
The return value type for the normal case, i.e:
Sample<const T, const H>
This type is a wrapper class for user load data to make it easier to read the data in it. -
The return value type for the error case, i.e., ChunkReceiveResult, is defined as follows:
enum class ChunkReceiveResult
{
TOO_MANY_CHUNKS_HELD_IN_PARALLEL,
NO_CHUNK_AVAILABLE
};
The cause of the error is stated.
Here is the code listing for this function:
template <typename T, typename H, typename BaseSubscriberType>
inline cxx::expected<Sample<const T, const H>, ChunkReceiveResult>
SubscriberImpl<T, H, BaseSubscriberType>::take() noexcept
{
auto result = BaseSubscriberType::takeChunk();
if (result.has_error())
{
return cxx::error<ChunkReceiveResult>(result.get_error());
}
auto userPayloadPtr = static_cast<const T*>(()->userPayload());
auto samplePtr = iox::unique_ptr<const T>(userPayloadPtr, [this](const T* userPayload) {
auto* chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload);
this->port().releaseChunk(chunkHeader);
});
return cxx::success<Sample<const T, const H>>(std::move(samplePtr));
}
Segment-by-segment code analysis:
-
LINE 05 ~ LINE 09: Call the takeChunk method of the base class BaseSubscriberType, i.e.: BaseSubscriber, to get a pointer to the ChunkHeader, for the ChunkHeader, refer to theiceoryx source code reading (II) section 4。
-
LINE 10 ~ LINE 15: Based on the pointer to the ChunkHeader, get the pointer to the user data and construct the application layer-friendly
Sample<const T, const H>
instance and return.
1.2 BaseSubscriber<port_t>::takeChunk
Duties:
template classBaseSubscriber<port_t>
which contains a file of typeport_t
membersm_port
This is the port data structure. iceoryx encapsulates the queue data structure in its ports, as described in theiceoryx source code reading (IV) section 1. For the subscriber class, theport_t
in effectSubscriberPortUser
。
Into the Senate:
not have
Return Value: cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>
-
The normal return value type is
const mepoo::ChunkHeader*
that is, pointing to theChunkHeader
The pointer to the -
The return value in case of error is of type
ChunkReceiveResult
The reason for the acquisition failure is explained in the following way.
Overall code analysis:
template <typename port_t>
inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> BaseSubscriber<port_t>::takeChunk() noexcept
{
return m_port.tryGetChunk();
}
This function just calls them_port
(This is an enumeration type)port_t
The specialization type isSubscriberPortUser
(used for emphasis)SubscriberPortUser::tryGetChunk
Methods.
1.3 SubscriberPortUser::tryGetChunk
The implementation code for this function is as follows:
cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> SubscriberPortUser::tryGetChunk() noexcept
{
return m_chunkReceiver.tryGet();
}
Again, this is a direct call to them_chunkReceiver
Its type isChunkReceiver<SubscriberPortData::ChunkReceiverData_t>
(used form a nominal expression)tryGet
method, where the paradigm parameterSubscriberPortData::ChunkReceiverData_t
By tracing back toSubscriberPortData
, located at iceoryx_posh/include/iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp:64, was found to beiox::popo::SubscriberChunkReceiverData_t
Type:
using ChunkReceiverData_t = iox::popo::SubscriberChunkReceiverData_t;
(indicates contrast)SubscriberChunkReceiverData_t
Another type alias, defined in iceoryx_posh/include/iceoryx_posh/internal/popo/ports/pub_sub_port_types.hpp::
using SubscriberChunkReceiverData_t =
ChunkReceiverData<MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY, SubscriberChunkQueueData_t>;
ChunkReceiverData
class is also a paradigm class whose base class is the second paradigm parameter, which in the above code is theSubscriberChunkQueueData_t
, is a type alias and the actual type is as follows:
using SubscriberChunkQueueData_t = ChunkQueueData<DefaultChunkQueueConfig, ThreadSafePolicy>;
this suggests thatChunkReceiverData
Paradigm classes inherit fromChunkQueueData
Paradigm class, this class is the communication queue data organization that we have in theThe iceoryx Source Code Reader (IV) - Shared Memory Communication (II)Introduced.
ChunkReceiverData
The only member defined in the class (excluding the base class)m_chunksInUse
The details are as follows:
static constexpr uint32_t MAX_CHUNKS_IN_USE = MaxChunksHeldSimultaneously + 1U;
UsedChunkList<MAX_CHUNKS_IN_USE> m_chunksInUse;
This member is used to cache incomingSharedChunk
, for reasons that will be explained in the next subsection. It should be noted thatUsedChunkList
Although the type nameList
, but it is essentially an array, see the data members in iceoryx_posh/include/iceoryx_posh/internal/popo/used_chunk_list.hpp:79-82 for details:
uint32_t m_usedListHead{INVALID_INDEX};
uint32_t m_freeListHead{0u};
uint32_t m_listIndices[Capacity];
DataElement_t m_listData[Capacity];
Here's a brief analysisChunkReceiver
the inheritance structure of theChunkReceiver<SubscriberPortData::ChunkReceiverData_t>
inherited fromChunkQueuePopper<typename ChunkReceiverDataType::ChunkQueueData_t>
As the name suggests, this class fetches elements from the communication queue and is the first class to be used on theChunkQueueData
of encapsulation. The reason is as follows: here the paradigm typeChunkReceiverData_t
, ie:ChunkReceiverData
A type alias is defined in the
using ChunkQueueData_t = ChunkQueueDataType;
Here.ChunkQueueDataType
It's actually the aboveChunkQueueData
The message queue can be accessed through them_queue
. Below, we presentChunkReceiver<ChunkReceiverDataType>::tryGet
specific implementations to further deepen the understanding of the above data structures.
1.4 ChunkReceiver::tryGet
ChunkReceiver<ChunkReceiverDataType>
is the paradigm class and the paradigm parameter
Duties:
Calls a member method of the base class to take out an element of the receive queue, the function returns a type ofSharedChunk
We're in theThe iceoryx source code reading (II) - Shared Memory ManagementIt's already covered in the (which seems a bit roughly presented now :-).
Into the Senate:
not have
Return Value: cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>
-
The normal return value type is
const mepoo::ChunkHeader*
that is, pointing to theChunkHeader
The pointer to the -
The return value in case of error is of type
ChunkReceiveResult
The reason for the acquisition failure is explained in the following way.
Segment-by-segment code analysis:
template <typename ChunkReceiverDataType>
inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>
ChunkReceiver<ChunkReceiverDataType>::tryGet() noexcept
{
auto popRet = this->tryPop();
if (popRet.has_value())
{
auto sharedChunk = *popRet;
// if the application holds too many chunks, don't provide more
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
return cxx::success<const mepoo::ChunkHeader*>(
const_cast<const mepoo::ChunkHeader*>(()));
}
else
{
// release the chunk
sharedChunk = nullptr;
return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::TOO_MANY_CHUNKS_HELD_IN_PARALLEL);
}
}
return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::NO_CHUNK_AVAILABLE);
}
LINE 05 ~ LINE 05: Calling the base class (i.e.: the one introduced in the previous subsection)ChunkQueuePopper<typename ChunkReceiverDataType::ChunkQueueData_t>
) member functiontryPop
through (a gap)ChunkQueueData
pops a message element from the maintained message queue with a return value ofSharedChunk
The next subsection details the implementation of this function.
LINE 07 ~ LINE 23: Successfully fetch the message from the message queue and store it in the cache array for the purpose of (1) subsequent retrieval of the message according to theChunkHeader *
pointer to get the correspondingSharedChunk
; (2) If not stored, leaving this function, theSharedChunk
The instance is dereferenced, meaning that the shared memory is freed. One might ask why not just return theSharedChunk
What about it? It's actually possible, but there still needs to be a place to cache theSharedChunk
instance until this shared memory is no longer needed. Caching it the first time you get it is a better way to do this.
LINE 24 ~ LINE 24:If the cache array is full, release theSharedChunk
i.e.: freed shared memory and returned an errorChunkReceiveResult::TOO_MANY_CHUNKS_HELD_IN_PARALLEL
。
1.5 ChunkQueuePopper::tryPop
Duties:
From the message queuem_queue
(Stored in shared memory, see:iceoryx source code reading (IV) - shared memory communication (II)) gets the message description data and transforms it into theSharedChunk
The instance returns.
Into the Senate:
not have
Return Value: cxx::optional<mepoo::SharedChunk>
No additional information such as the cause of the error is needed here, so unlike the previous function, expect is not used as the return type.
-
The normal return value type is
mepoo::SharedChunk
。 -
Returns a special value in case of error
nullopt_t
。
Segment-by-segment code analysis:
template <typename ChunkQueueDataType>
inline cxx::optional<mepoo::SharedChunk> ChunkQueuePopper<ChunkQueueDataType>::tryPop() noexcept
{
auto retVal = getMembers()->m_queue.pop();
// check if queue had an element that was poped and return if so
if (retVal.has_value())
{
auto chunk = ().releaseToSharedChunk();
auto receivedChunkHeaderVersion = ()->chunkHeaderVersion();
if (receivedChunkHeaderVersion != mepoo::ChunkHeader::CHUNK_HEADER_VERSION)
{
LogError() << "Received chunk with CHUNK_HEADER_VERSION '" << receivedChunkHeaderVersion
<< "' but expected '" << mepoo::ChunkHeader::CHUNK_HEADER_VERSION << "'! Dropping chunk!";
errorHandler(PoshError::POPO__CHUNK_QUEUE_POPPER_CHUNK_WITH_INCOMPATIBLE_CHUNK_HEADER_VERSION,
ErrorLevel::SEVERE);
return cxx::nullopt_t();
}
return cxx::make_optional<mepoo::SharedChunk>(chunk);
}
else
{
return cxx::nullopt_t();
}
}
LINE 04 ~ LINE 04: Message Queue from Shared Memorym_queue
GettingShmSafeUnmanagedChunk
Element (please refer toThe iceoryx Source Code Reader (III) - Shared Memory Communication (I))。
LINE 07 ~ LINE 21: Get Success
pass (a bill or inspection etc)ShmSafeUnmanagedChunk
member method of thereleaseToSharedChunk
Find the corresponding shared memory region, and create theSharedChunk
instance and return. Note that LINE 11 to LINE 19 do version checking, and if it does not match the version on the receiving end, again the fetch is considered to have failed. The version number is an integer that identifies an incompatible modification. iceory has the following explanation:
/// @brief From the 1.0 release onward, this must be incremented for each incompatible change, .
/// - data width of members changes
/// - members are rearranged
/// - semantic meaning of a member changes
static constexpr uint8_t CHUNK_HEADER_VERSION{1U};
LINE 22 ~ LINE 25: Failed to get
Direct return of special valuescxx::nullopt_t()
Here it implicitly calls theoptional
of the following constructor:
optional(const nullopt_t) noexcept;
2 Callback Function Receiving Mechanism
If the receiver starts first, the shared memory queue is empty and an error is returned.ChunkReceiveResult::NO_CHUNK_AVAILABLE
, if the receiving side keeps polling, it will waste performance. Is it possible to implement asynchronous listening and processing of messages with a notification mechanism on the sending side? The answer is yes, and we will provide an in-depth introduction to the notification logic on the sending side and the waiting logic on the receiving side of the message in "iceoryx Source Code Reading (IX) - Waiting and Notification Mechanisms".
3 Summary
This article mainly introduces the normal message receiving process, for asynchronous callback message receiving process will be introduced in a subsequent article.