Jun 12, 2019

[C++] C++ Concurrency In Action, Second edition, recap [Ch.6]

Code itself is the best explanation, thus this note will contain most example code excerpted from book's Ch.6

Concepts can be read from book The Art of Multiprocessor Programming.



Design concurrent data structure

  • Minimize serialization
  • The fewer operations are serialized, and the greater the potential for concurrency.
  • Ensure that no thread can see a state where the invariants of the data structure have been broken by the actions of another thread.
  • Take care to avoid race conditions inherent in the interface to the data structure by providing functions for complete operations rather than for operation steps.
  • Pay attention to how the data structure behaves in the presence of exceptions to ensure that the invariants are not broken.
  • Minimize the opportunities for deadlock when using the data structure by restricting the scope of locks and avoiding nested locks where possible.



Lock design

  • Can the scope of locks be restricted to allow some parts of an operation to be performed outside the lock?
  • Can different parts of the data structure be protected with different mutexes?
  • Do all operations require the same level of protection?
  • Can a simple change to the data structure improve the opportunities for concurrency without affecting the operational semantics?



fine-grained locking

Design

Look carefully at the details of the data structure rather than wrapping a pre-existing container such as std::map<>
 
e.g
template<typename T>
class threadsafe_queue
{
    private:
    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }

    std::unique_ptr<node> pop_head()
    {
        std::unique_ptr<node> old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }

    std::unique_lock<std::mutex> wait_for_data()
    {
        std::unique_lock<std::mutex> head_lock(head_mutex);
        data_cond.wait(head_lock, [&]{return head.get()!=get_tail();});
        return std::move(head_lock);
    }

    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        return pop_head();
    }

    std::unique_ptr<node> wait_pop_head(T& value)
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        value=std::move(*head->data);
        return pop_head();    
    }
    
    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr<node>();
        }
        return pop_head();
    }
    
    std::unique_ptr<node> try_pop_head(T& value)
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr<node>();
        }
        value=std::move(*head->data);
        return pop_head();
    }
    
    public:
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> const old_head=wait_pop_head();
        return old_head->data;
    }

    void wait_and_pop(T& value)
    {
        std::unique_ptr<node> const old_head=wait_pop_head(value);
    }
    
    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> old_head=try_pop_head();
        return old_head?old_head->data:std::shared_ptr<T>();
    }

    bool try_pop(T& value)
    {
        std::unique_ptr<node> const old_head=try_pop_head(value);
        return old_head;
    }

    bool empty()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        return (head.get()==get_tail());
    }
};



Implement a thread safe associative container

Design

Three common ways of implementing an associative container like your lookup table:
  • A binary tree, such as a red-black tree - Not so good for locking.(have to lock the tree and every node we traverse)
  • A sorted array - single lock for the whole array(Maybe not, but pre/post/current lock should be enough) 
  • A hash table - lock per bucket. (RWLock)

e.g
template<typename Key,typename Value,typename Hash=std::hash<Key> >
class threadsafe_lookup_table
{
    private:
    class bucket_type
    {
        private:
        typedef std::pair<Key,Value> bucket_value;
        typedef std::list<bucket_value> bucket_data;
        typedef typename bucket_data::iterator bucket_iterator;
        bucket_data data;
        mutable std::shared_mutex mutex;
        
        bucket_iterator find_entry_for(Key const& key) const
        {
            return std::find_if(data.begin(),data.end(),
                                  [&](bucket_value const& item)
                                  {return item.first==key;});
        }
        
        public:
        Value value_for(Key const& key,Value const& default_value) const
        {   
            std::shared_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            return (found_entry==data.end()) ? default_value : found_entry->second;
        }

        void add_or_update_mapping(Key const& key,Value const& value)
        {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            if(found_entry==data.end())
            {
                data.push_back(bucket_value(key,value));
            }
            else
            {
                found_entry->second=value;
            }
        }

        void remove_mapping(Key const& key)
        {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            if(found_entry!=data.end())
            {
                data.erase(found_entry);
            }
        }
    };

    std::vector<std::unique_ptr<bucket_type> > buckets;
    Hash hasher;
    bucket_type& get_bucket(Key const& key) const
    {
        std::size_t const bucket_index=hasher(key)%buckets.size();
        return *buckets[bucket_index];
    }

    public:
    typedef Key key_type;
    typedef Value mapped_type;
    typedef Hash hash_type;
    
    threadsafe_lookup_table(unsigned num_buckets=19,Hash const& hasher_=Hash()):
        buckets(num_buckets),hasher(hasher_)
    {
        for(unsigned i=0;i<num_buckets;++i)
        {
            buckets[i].reset(new bucket_type);
        }
    }

    threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
    threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other)=delete;
    
    Value value_for(Key const& key,
    Value const& default_value=Value()) const
    {
        return get_bucket(key).value_for(key,default_value);
    }

    void add_or_update_mapping(Key const& key,Value const& value)
    {
        get_bucket(key).add_or_update_mapping(key,value);
    }
    
    void remove_mapping(Key const& key)
    {
        get_bucket(key).remove_mapping(key);
    }
    
    std::map<Key,Value> get_map() const
    {
        std::vector<std::unique_lock<std::shared_mutex> > locks;
        for(unsigned i=0;i<buckets.size();++i)
        {
            locks.push_back(
                std::unique_lock<std::shared_mutex>(buckets[i].mutex));
        }
        
        std::map<Key,Value> res;
        for(unsigned i=0;i<buckets.size();++i)
        {
            for(bucket_iterator it=buckets[i].data.begin();
                it!=buckets[i].data.end();
                ++it)
            {
                res.insert(*it);
            }
        }   
        return res;
    }
};



Implement a thread safe list container

Design

Public API:
  • Add an item to the list.
  • Remove an item from the list if it meets a certain condition.
  • Find an item in the list that meets a certain condition.
  • Update an item that meets a certain condition.
  • Copy each item in the list to another container.

e.g
template<typename T>
class threadsafe_list
{
    struct node
    {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
        node(): next() {}
        node(T const& value): data(std::make_shared<T>(value)) {}
    };

    node head;

    public:
    threadsafe_list() {}
    
    ~threadsafe_list()
    {
        remove_if([](node const&){return true;});
    }

    threadsafe_list(threadsafe_list const& other)=delete;
    threadsafe_list& operator=(threadsafe_list const& other)=delete;

    void push_front(T const& value)
    {
        std::unique_ptr<node> new_node(new node(value));
        std::lock_guard<std::mutex> lk(head.m);
        new_node->next=std::move(head.next);
        head.next=std::move(new_node);
    }
    
    template<typename Function>
    void for_each(Function f)
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            f(*next->data);
            current=next;
            lk=std::move(next_lk);
        }
    }

    template<typename Predicate>
    std::shared_ptr<T> find_first_if(Predicate p)
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            if(p(*next->data))
            {
                return next->data;
            }
            current=next;
            lk=std::move(next_lk);
        }
        
        return std::shared_ptr<T>();
    }

    template<typename Predicate>
    void remove_if(Predicate p)
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            if(p(*next->data))
            {
                std::unique_ptr<node> old_next=std::move(current->next);
                current->next=std::move(next->next);
                next_lk.unlock();
            }
            else
            {
                lk.unlock();
                current=next;
                lk=std::move(next_lk);
            }
        }
    }
};

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.