Shared Memory

Anyone have tips or resources on how to use shared memory?

I have a system with multiple standalone components that need to share data. I want to use shared memory instead of doing something else like multicast due to latency concerns.

For example, a strategy component needs to share data with a risk management component needs to share data with an outbound fix gateway component, etc.
 
If you like my code for this, give us a shout :)
Not sure what the best library is these days, maybe a good start-off? The principles are standard.

Any chance you've ever tried to construct a deque in shared memory? I am running into this error but not sure how to resolve it. Based on all my research so far, it seems as though I have this setup correctly but obviously something is wrong given the error: boost::interprocess_exception::library_error

C++:
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/deque.hpp>


struct SharedData
{
    double bid;
    double ask;
    std::uint32_t size;

    SharedData(double bid_, double ask_, std::uint32_t size_) : bid{bid_}, ask{ask_}, size{size_}
    {

    }
};

void publish(double bid, double ask, std::uint32_t size)
{
                // Open or create the shared memory segment
            boost::interprocess::managed_shared_memory shm(
                    boost::interprocess::open_or_create, "SharedMemory", 1024 * 1024 * 64);

            // Define an allocator for SharedData objects
            typedef boost::interprocess::allocator<SharedData,
                    boost::interprocess::managed_shared_memory::segment_manager> SharedDataAllocator;

            // Create a deque to store SharedData objects
            typedef boost::interprocess::deque<SharedData, SharedDataAllocator> SharedDataDeque;

            // Construct deque in shared memory
            SharedDataAllocator alloc(shm.get_segment_manager());
    
            // ERROR IS THROWN HERE
            // boost::interprocess_exception::library_error
            SharedDataDeque* dataQueue = shm.construct<SharedDataDeque>("SharedDataQueue")(alloc);
}
 
I can't remember about deque .. let me look up my stuff (of > 10 years ago). I have 2 chapters in my Boost II book on deque and message_queue.

Actually, your code looks good. I used a) find_or_construct, b) shm ... create only.(?)
Code:
We now turn our attention to showing how to use shared memory in the Boost C++ Process library. More details can be found in Demming and Duffy (2012). Shared memory is the most efficient way to exchange data between processes. In general, the operating system maps a block of physical memory (memory segment) into the address space of several processes. The resulting shared memory segment is created in one process and then opened in another process. Using a shared memory segment takes place as follows:
1. The shared memory is created or opened.
2. A region of the shared memory is mapped in the process region so it can be accessed as if it were normal memory.
The shared memory segment remains in memory even when the process that created it terminates. Thus, the lifetime of shared memory can be longer than the lifetime of the processes that use it. The shared memory segment must be explicitly deleted by the last process.
The disadvantage of mapped regions is that they are difficult to use because they are accessed at byte level. Furthermore, the memory management in the mapped region is done manually, making it difficult to manage complex objects. In order to resolve these problems we create managed memory segments that manage IPC memory. We can manage objects similar to how operators new and delete work. In short, managed memory segments address the following issues:
. Dynamic allocation of memory segments.
. Construction of C++ objects in memory segments.
. Objects can have a name.
. Searching for named objects.
We take an example that creates and uses shared memory called SharedMemory101 in Boost C++ Process. The first example creates shared memory holding a double.  We show the self-documenting code as follows:

#include <iostream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include "Point.hpp" // well-known user-defined type

using namespace boost::interprocess;

void SharedMemory101()
{ // Initial code to show features

    std::cout << "Shared memory 101 " << '\n';

    // Clean shared memory
    shared_memory_object::remove("SharedMemory101");

    // Create the managed shared memory, size 1024 bytes
    managed_shared_memory shm(open_or_create, "SharedMemory101", 1024);

    // Running statistics
    std::cout << "* Segment size: " << shm.get_size() << '\n';
    std::cout << "* Free memory: " << shm.get_free_memory() << '\n';

    // Initialise the shared memory
    double* d = shm.construct<double>("Double")(3.1415);
    double* d2 = shm.construct<double>("Double2")(2.71);

    // Get value from memory, use find() to return a pair type
    auto p = shm.find<double>("Double");
    auto p2 = shm.find<double>("Double2");

    if (p.first)
    {
        std::cout << *p.first << '\n'; // 3.1415
        std::cout << p.second << '\n'; // 1 number
    }

    if (p2.first)
    {
        std::cout << *p2.first << '\n'; // 2.71
        std::cout << p2.second << '\n'; // 1 number
    }
        
    // Remove the shared memory
    if (shared_memory_object::remove("SharedMemory101") == true)
    {
        std::cout << "Shared memory 101 removed." << '\n';
    }
    else
    {
        std::cout << "Error while removing shared memory 101." << '\n';
    }
}

We now turn our attention to using shared memory called SharedMemory102 with user-defined types. The sample code is similar to the above code:
void SharedMemory102()
{ // Code to show arrays of user-defined types

    std::cout << "Shared memory 102 " << '\n';

    try
    {
        // Clean shared memory
        shared_memory_object::remove("SharedMemory102");

        // Create the managed shared memory, size 1024 bytes
        managed_shared_memory shm(open_or_create,
"SharedMemory102", 1024);

        // Initialise the shared memory
        Point* pt = shm.construct<Point>("Point")(1.0, 2.0);

        // Get value from memory, use find() to return a pair type
        auto p = shm.find<Point>("Point");

        if (p.first)
        {
            std::cout << *p.first << ", " << '\n'; // (1.0, 2.0)
            std::cout << p.second << '\n'; // sz numbers
        }
    }
    catch (boost::interprocess::interprocess_exception& e)
    {
        std::cout << "Error IPC " << e.what() << '\n';
    }

    // Remove the shared memory
    if (shared_memory_object::remove("SharedMemory102") == true)
    {
        std::cout << "Shared memory 102 removed." << '\n';
    }
    else
    {
        std::cout << "Error while removing shared memory 102." << '\n';
    }
}


A discussion of IPC in Python is outside the scope of this book. We hope that the above code samples will give some insights into Interprocess Communication (IPC) which represents the plumbing infrastructure in a number of system and distributed patterns for many kinds of  software systems.
 
V1 ; with my own synchronised queue, runs OK

Code:
// Introduction to the Boost C++ Libraries - Volume II - Advanced Libraries
//
// Chapter 12.3 - Boost Interprocess: Process Synchronisation - Condition Variable
//
// (C) Datasim Education BV  2011


#include "SynchronisedQueue.hpp"

#include <iostream>
#include <sstream>

#include <boost/thread.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/string.hpp>

using namespace boost::interprocess;

// Regular strings can't be put in IPC memory so we use a boost::interprocess::basic_string.
// The interprocess string needs an allocator for chars.
typedef allocator<char, managed_shared_memory::segment_manager> CharAllocator;
typedef basic_string<char, std::char_traits<char>, CharAllocator> shm_string;

// Typedef for a managed shared memory allocator used by the synchronised queue of shm_strings.
typedef allocator<shm_string, managed_shared_memory::segment_manager> StringAllocator;
typedef SynchronisedQueue<shm_string, StringAllocator> StringQueue;

// Producer using queue in shared memeory.
void Producer()
{
    // Create or open the managed shared memory.
    managed_shared_memory segment(open_or_create, "MySharedMemory", 1024);

    // Create an allocator with the segment manager to use.
    // We create an allocator for voids. Since the allocator has a templated converting constructor,
    // we can pass this one to both the string and queue classes.
    allocator<void, managed_shared_memory::segment_manager> alloc(segment.get_segment_manager());

    // Create or find the string queue object (is already synchronised).
    StringQueue* queue=segment.find_or_construct<StringQueue>("MyQueue")(alloc);

    // Producer started.
    std::cout<<"Producer started. Putting data in the queue."<<std::endl;

    // Put strings in the queue.
    int data=0;
    while (data!=10)
    {
        // Produce a string and store in the queue.
        std::stringstream tmp; tmp<<"Number: "<<data++;
        shm_string str(tmp.str().c_str(), alloc);
        queue->Enqueue(str);
        std::cout<<"Data produced: "<<str<<std::endl;

        // Sleep a second.
        boost::this_thread::sleep(boost::posix_time::seconds(1));
    }

    // Insert empty string to indicate end.
    queue->Enqueue(shm_string("", alloc));

    // Producer finished.
    std::cout<<"Producer finished."<<std::endl;
}

// Consumer using queue in shared memory.
void Consumer()
{
    // Create or open the managed shared memory.
    managed_shared_memory segment(open_or_create, "MySharedMemory", 1024);

    // Create an allocator with the segment manager to use.
    // We create an allocator for voids. Since the allocator has a templated converting constructor,
    // we can pass this one to both the string and queue classes.
    allocator<void, managed_shared_memory::segment_manager> alloc(segment.get_segment_manager());

    // Create or find the string queue object (is already synchronised).
    StringQueue* queue=segment.find_or_construct<StringQueue>("MyQueue")(alloc);

    // Consumer started.
    std::cout<<"Consumer started. Wating for data..."<<std::endl;

    // Retrieve strings from the queue.
    shm_string data(alloc);
    do
    {
        // Retrieve data from the queue.
        data=queue->Dequeue();

        // Display the extracted data.
        if (data!="") std::cout<<"Data consumed: "<<data<<std::endl;
    }
    while (data!="");

    // The queue and shared memory can now be deleted.

    // Remove the queue.
    segment.destroy_ptr(queue);

    // Remove the shared memory.
    if (shared_memory_object::remove("MySharedMemory")==true) std::cout<<"Shared memory removed."<<std::endl;
    else std::cout<<"Error while removing shared memory."<<std::endl;
}

int main()
{
    char choice;
    std::cout<<"Producer or Consumer (P/C): ";
    std::cin>>choice;

    switch (choice)
    {
    case 'p':
    case 'P':
        Producer();
        break;

    case 'c':
    case 'C':
        Consumer();
        break;

    case 'd':
    case 'D':
        // Remove the shared memory.
        if (shared_memory_object::remove("MySharedMemory")==true) std::cout<<"Shared memory removed."<<std::endl;
        else std::cout<<"Error while removing shared memory."<<std::endl;
    }

    return 1;
}
 
Synched queue

Code:
// SynchronisedQueue.hpp
//
// A queue class that has interprocess synchronisation.
//
// (C) Datasim Education BV  2011

#ifndef SynchronisedQueue_hpp
#define SynchronisedQueue_hpp

#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/allocators/allocator.hpp>

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

using namespace boost::interprocess;


// Queue class that has interprocess synchronisation and notification.
// Since a regular STL queue can't be used in IPC memory, we use the boost::interprocess::deque.
// Our synchronised queue has two template arguments, one for the type to store and one for the allocator to use.
template <typename T, typename TAllocator>
class SynchronisedQueue
{
private:
    deque<T, TAllocator> m_queue;        // Use boost::interprocess deque to store data.
    interprocess_mutex m_mutex;            // The mutex to synchronise on.
    interprocess_condition m_cond;        // The condition to wait for.

public:

    // Constructor with the allocator to use for the queue.
    SynchronisedQueue(const TAllocator& a): m_queue(a)
    {
    }

    // Add data to the queue and notify others.
    void Enqueue(const T& data)
    {
        // Acquire lock on the queue.
        scoped_lock<interprocess_mutex> lock(m_mutex);

        // Add the data to the queue.
        m_queue.push_back(data);

        // Notify others that data is ready.
        m_cond.notify_one();

    } // Lock is automatically released here.

    // Get data from the queue. Wait for data if not available.
    T Dequeue()
    {
        // Acquire lock on the queue.
        scoped_lock<interprocess_mutex> lock(m_mutex);

        // When there is no data, wait till someone fills it.
        // Lock is automatically released in the wait and obtained again after the wait.
        while (m_queue.size()==0) m_cond.wait(lock);

        // Retrieve the data from the queue
        T result=m_queue.front(); m_queue.pop_front();
        return result;

    } // Lock is automatically released here.
};


#endif
 
Plan B ? Message Queue
Code:
// Introduction to the Boost C++ Libraries - Volume II - Advanced Libraries
//
// Chapter 12.4 - Boost Interprocess: Process Synchronisation - Message Queue
//
// Producer inserts messages in the queue with a priority.
// Consumer retrieves the messages from the queue.
//
// (C) Datasim Education BV  2011

#include <iostream>

#include <boost/thread.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

using namespace boost::interprocess;

const int numbers=20;

void Producer()
{
    // Open exisiting or create a message queue.
    message_queue queue(open_or_create, "MyQueue", 10, sizeof(int));

    // Send a sequence of numbers. Also a priority is generated.
    for (int i=0; i!=numbers; i++)
    {
        // Generated a priority.
        unsigned int priority=i+10;

        // Enqueue the sequence number.
        queue.send(&i, sizeof(i), priority);
        std::cout<<"Number queued: "<<i<<", priority: "<<priority<<std::endl;

        // Sleep a second.
        boost::this_thread::sleep(boost::posix_time::seconds(1));
    }
}

void Consumer()
{
    // Open exisiting or create a message queue.
    message_queue queue(open_or_create, "MyQueue", 10, sizeof(int));

    // Variable to receive data in.
    int number;                // The number.
    std::size_t size;        // The number of bytes received.
    unsigned int priority;    // The priority of the received message.

    // Dequeue the numbers.
    for (int i=0; i!=numbers; i++)
    {
        // Dequeue the number.
        queue.receive(&number, sizeof(number), size, priority);
        std::cout<<"Number dequeued: "<<number<<", priority: "<<priority<<", size: "<<size<<std::endl;
    }

    // Remove the message queue.
    message_queue::remove("MyQueue");
}

int main()
{
    char choice;
    std::cout<<"Producer or Consumer (P/C): ";
    std::cin>>choice;

    switch (choice)
    {
    case 'p':
    case 'P':
        Producer();
        break;

    case 'c':
    case 'C':
        Consumer();
        break;
    }

    return 1;
}
 
Some Q

1. Do you have a catch block ==> can tell what the error is.
2. My SynchronisedQueue is thread safe AFAIR it is essential(?)
3. TRY SynchronisedQueue instead of IPC::deque??? one possible issue?
4. ipc:: is nice
5. Allocation thread-safety: Allocation and deallocation are not thread-safe.

Code:
 catch(const ipc::interprocess_exception& ex)
    {
        logError("failed with exception \"%s\"", ex.what());
        return false;
    }
    ...
 
Last edited:
It’s actually working now with the same code I posted above. The only thing I did was that I resorted to deleting my build directory and recreated it.

One additional question - how do you design it when the producer can enqueue messages all day (e.g think buy/sell orders generated by new quotes being ingested by a feed handler) and the consumer needs it read it as soon as possible.

Does the consumer simply busy spin and check the deque for a new message? Is there some way to notify the consumer that it has a new message to read?
 
This is a PC using C++20 and Agents library.
I'm showing off my skills here 🤖

(Rust supports somethin similar == channels)

Code:
// Test101ActorConcepts.cpp
//
// Simplest example of a system. Context diagram consists only
// of Input and Output systems.
//
// We use C++20 Concepts to replace V1 (outdated) policy-based design (PBD)
//
// Composition
//
//  V2: design using C++20 Concepts
//  V3: V2 + embedded actors
//
// Problem now becomes a pipeline Source -> SUD -> Sink
//
// Summary: this approach is feasible, so worth investigating..
// 1992 .. everything is an object
// 2024 .. everything is an actor
//
// (C) Datasim Education BV 2015-2024
//

#include <string>
#include <iostream>
#include <type_traits>
#include <mutex>
#include <agents.h>

// Interface contract specification
// 1. Each concept is an abstract method
// 2. interface == concept conjunction (e.g. ISUD)

// 1. Define abstract methods (building blocks)
template<typename Message, template <typename Message> class T> // abstract method
    concept ISource = requires (T<Message> x) { x.message(); };
template<typename Message, template <typename Message> class T> // abstract method
    concept ISink = requires (T<Message> x, const Message& s) { x.print(s); };

// 2. Interface relating to context diagram; cojunctional concepts are an alternative
// to multiple inheritance but no danger of fragile base class problem
template< typename Message, template<typename Message> class Input, template<typename Message> class Output>
    concept ISUD = ISource<Message, Input> && ISink<Message, Output>;

// Agent actor-based constraints
template<typename Derived>
    concept IActor1 = std::derived_from<Derived, concurrency::agent>;
template<typename T>
    concept IActor2 = requires (T x) { x.run(); };

// 3. Interface relating to defining constraints pertaining to Actor technology
// https://learn.microsoft.com/en-us/cpp/parallel/concrt/asynchronous-agents-library?view=msvc-170
template<typename Derived>
    concept IActor = IActor1<Derived> && IActor2<Derived>;
   
// The mediator using template template parameter "trick" => can use generic messages
template <typename Message, template <typename Message> class Source, template <typename Message> class Sink>
                requires ISUD<Message, Source, Sink> &&  IActor<Source<Message>> && IActor<Sink<Message>>
    class SUD : public concurrency::agent
{ // SUD is in a chain from Input to Output

private:
    concurrency::ISource<Message>& _source; // formerly known as src
    concurrency::ITarget<Message>& _target; // formerly known as snk
public:
    explicit SUD(concurrency::ISource<Message>& source, concurrency::ITarget<Message>& target)
        : _source(source), _target(target) {}

    void run()
    {
        Message info = concurrency::receive(_source);
        concurrency::send(_target, info);

        done();
    }
};    

// Instance Systems
template <typename Message>
    class MySource : public concurrency::agent
{
    concurrency::ITarget<Message>& _target; // send to SUD
    Message _msg;
public:
    explicit MySource(const Message& msg, concurrency::ITarget<Message>& target)
        : _target(target), _msg(msg) {}

    Message message() const
    {
        // Get data from hardware device
        return Message(_msg);
    }

    void run()
    {
        concurrency::send(_target, message());

        done();
    }

};

template <typename Message>
    class MySink : public concurrency::agent
{
    concurrency::ISource<Message>& _source; // received from SUD
    int _id;
    Message info;
    std::mutex myMutex;
public:
    explicit MySink(concurrency::ISource<Message>& source, int ID = 0) : _source(source), _id(ID) {}
    void print(const Message& s)
    {
        std::lock_guard<std::mutex> guard(myMutex);
        std::cout << "\nin a sink: " << _id << ": " << info << std::endl;
    }

    void run()
    {
        info = concurrency::receive(_source);
        print(info);
        done();
    }

    double compute(int val)
    {
        info *= val*_id;
        return info;
    }
};

int main()
{
    { // Single sink
        using Message = std::string;
        Message m(" good morning");

        // All actors access (read from/write to) a single buffer
        concurrency::overwrite_buffer<Message> buffer;
        MySource i(m, buffer);
        SUD<Message, MySource, MySink> s(buffer, buffer);
        MySink o(buffer);

        i.start(); o.start(); s.start();
        concurrency::agent::wait(&i); concurrency::agent::wait(&s); concurrency::agent::wait(&o);
    }

    {    // Multiple sinks
        using Message = int;
        Message m(23);

        // All actors access (read from/write to) a single buffer
        concurrency::overwrite_buffer<Message> buffer;
        MySource i(m, buffer);
        SUD<Message, MySource, MySink> s(buffer, buffer);
        MySink o1(buffer, 1);
        MySink o2(buffer, 2);
        MySink o3(buffer, 3);
        i.start(); s.start();
        o1.start(); o2.start(); o3.start();
        concurrency::agent::wait(&i); concurrency::agent::wait(&s);
        concurrency::agent::wait(&o1);
        concurrency::agent::wait(&o2);
        concurrency::agent::wait(&o3);

        std::cout << "\ncompute: " << o1.compute(2) << '\n';
        std::cout << "\ncompute: " << o2.compute(2) << '\n';
        std::cout << "\ncompute: " << o3.compute(2) << '\n';
    }

    return 0;
}
 
Back
Top Bottom