Showing posts with label structure_concurrency. Show all posts
Showing posts with label structure_concurrency. Show all posts

Aug 18, 2021

[concurrent] notes from Leslie Lamport

Leslie Lamport’s "How to Make a Multiprocessor Computer That Correctly Executes Multiprocess Programs" defines sequential consistency:


The customary approach to designing and proving the correctness of

multiprocess algorithms for such a computer assumes that the following

condition is satisfied: the result of any execution is the same as if

the operations of all the processors were executed in some sequential

order, and the operations of each individual processor appear in this

sequence in the order specified by its program. A multiprocessor

satisfying this condition will be called sequentially consistent.



Even on ARM/POWER: threads in the system must agree about a total order for the writes to a single memory location.


“weakly ordered” defined as follows:

Let a synchronization model be a set of constraints on memory accesses

that specify how and when synchronization needs to be done.

Hardware is weakly ordered with respect to a synchronization model

iff it appears sequentially consistent to all software that

obey the synchronization model.

Jul 9, 2020

[unix][programming] EINTR and What It Is Good For (http://250bpm.com/blog:12)

Reference:
EINTR and What It Is Good For Martin Sústrik, zeromq


Before we dive, this concept is well mentioned in Richard Stevens's UNIX Network Programming - Ch.20.5, thus Martin Sústrik's blog post can be considered as a recap of EINTR error.

Rule of thumb: 

When handling EINTR error, check any conditions that may have been altered by signal handlers.
Then restart the blocking function.

Additionally, If you are implementing a blocking function yourself, take care to return EINTR when you encounter a signal.

Beware those 2 POSIX functions which don't honor EINTR


Consider this code:
volatile int stop = 0;

void handler (int)
{
    stop = 1;
}

void event_loop (int sock)
{
    signal (SIGINT, handler);

    while (1) {
        if (stop) {  // never hit if recv is blocked
            printf ("do cleanup\n");
            return;
        }
        char buf [1];
        recv (sock, buf, 1, 0);  // block call
        printf ("perform an action\n");
    }
}

Above is the reason POSIX has EINTR error.

Modify code to this:
noted that to make blocking functions like recv return EINTR you may have to use sigaction() with SA_RESTART set to zero instead of signal() on some operating systems.
volatile int stop = 0;

void handler (int)
{
    stop = 1;
}

void event_loop (int sock)
{
    signal (SIGINT, handler);

    while (1) {
        if (stop) {
            printf ("do cleanup\n");
            return;
        }
        char buf [1];
        int rc = recv (sock, buf, 1, 0);
        if (rc == -1 && errno == EINTR)  // if interrupted by signal, continue while loop
            continue;
        printf ("perform an action\n");
    }
}


But, this isn't a graceful shutdown.
We have to exhaust the incoming message before exit.
When you press Ctrl+C, program exits performing the clean-up beforehand.

The morale of this story is that common advice to just restart the blocking function when EINTR is returned doesn't quite work:
volatile int stop = 0;

void handler (int)
{
    stop = 1;
}

void event_loop (int sock)
{
    signal (SIGINT, handler);

    while (1) {
        if (stop) {
            printf ("do cleanup\n");
            return;
        }
        char buf [1];
        while (1) {
            // even signaled with stop == 1, and no more incoming data, we are stucked here..
            int rc = recv (sock, buf, 1, 0); 
            // if signaled, continue to recv, otherwise, message consumed, break inner loop
            if (rc == -1 && errno == EINTR) 
                continue;
            break;
        }
        printf ("perform an action\n");
    }
}


Even EINTR is not completely water-proof, check this code:
volatile int stop = 0;

void handler (int)
{
    stop = 1;
}

void event_loop (int sock)
{
    signal (SIGINT, handler);

    while (1) {
        if (stop) {
            printf ("do cleanup\n");
            return;
        }

        /*  What if signal handler is executed at this point? */
        /* pressing Ctrl+C for the second time sorts the problem out */

        char buf [1];
        // even stop == 1, and no more data coming, we are stucked here...
        int rc = recv (sock, buf, 1, 0);
        if (rc == -1 && errno == EINTR)
            continue;
        printf ("perform an action\n");
    }
}



Ultimate solution

use pselect, which mask the signals before calling pselect, and allow signal to pass during the pselect(which if signal occurs, pselect returns).

select
int select(int nfds,
                    fd_set *readfds,
                    fd_set *writefds,
                    fd_set *exceptfds, 
                    struct timeval *timeout);
                    

nfds should be n + 1 (exclusive bound), this optimizing the linear check of fds.

Be sure to check the definition under what conditions is a Descriptor ready for network FDs.

Notice that when an error occurs on a socket, both readable and writable is marked by select.

Although the timeval structure lets us specify a resolution in microseconds, the actual resolution supported by the kernel is often more coarse.
Many Unix kernels round the timeout value up to a multiple of 10ms. There is also a scheduling latency involved, meaning it takes some time after the timer expires before the kernel schedules this process to run.


void FD_ZERO(fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
int FD_ZERO(int fd, fd_set *fdset);




pselect
 int pselect(
            int nfds,
            fd_set *restrict readfds,
            fd_set *restrict writefds,
            fd_set *restrict errorfds,
            const struct timespec *restrict timeout,
            const sigset_t *restrict sigmask);

example:
// https://github.com/k84d/unpv13e/blob/master/bcast/dgclibcast4.c
#include "unp.h"

static void recvfrom_alarm(int);

void
dg_cli(FILE *fp, int sockfd, const SA *pservaddr, socklen_t servlen)
{
 int    n;
 const int  on = 1;
 char   sendline[MAXLINE], recvline[MAXLINE + 1];
 fd_set   rset;
 sigset_t  sigset_alrm, sigset_empty;
 socklen_t  len;
 struct sockaddr *preply_addr;
 
 preply_addr = Malloc(servlen);

 Setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on));

 FD_ZERO(&rset);

 Sigemptyset(&sigset_empty);
 Sigemptyset(&sigset_alrm);
 Sigaddset(&sigset_alrm, SIGALRM);

 Signal(SIGALRM, recvfrom_alarm);

 while (Fgets(sendline, MAXLINE, fp) != NULL) {
  Sendto(sockfd, sendline, strlen(sendline), 0, pservaddr, servlen);

  Sigprocmask(SIG_BLOCK, &sigset_alrm, NULL);
  alarm(5);
  for ( ; ; ) {
   FD_SET(sockfd, &rset);
   n = pselect(sockfd+1, &rset, NULL, NULL, NULL, &sigset_empty);
   if (n < 0) {
    if (errno == EINTR)
     break;
    else
     err_sys("pselect error");
   } else if (n != 1)
    err_sys("pselect error: returned %d", n);

   len = servlen;
   n = Recvfrom(sockfd, recvline, MAXLINE, 0, preply_addr, &len);
   recvline[n] = 0; /* null terminate */
   printf("from %s: %s",
     Sock_ntop_host(preply_addr, len), recvline);
  }
 }
 free(preply_addr);
}

static void
recvfrom_alarm(int signo)
{
 return;  /* just interrupt the recvfrom() */
}


poll
int poll(
        struct pollfd *fds,
        nfds_t nfds,
        const struct timespec *tmo_p,
        const sigset_t *sigmask);
        

Oct 13, 2019

[Concurrency] Optimistic concurrency control

Optimistic concurrency control (sometimes referred to as optimistic locking) is a method where instead of locking a piece of data and preventing it from being read or updated while the lock is in place, the piece of data includes a version number. 

In C++ memory model, which is referred in transactional memory as implementation detail.

Every time the data is updated, the version number increases.
Since version number can be used with the scalar type, which can be add/minus with single instruction without lock.

When updating the data, the version number is checked to see if it has increased between the time the client read the data and the time it submits the update.
(perfect use case for atomic compare and exchange)

If this happens, the update is rejected and the client must re-read the new data and try to update it again.

The result is that when two clients try to update the same data entry, only the first one succeeds.

All k8s resources include a metadata.resourceVersion field, which clients need to pass back to the API server when updating an object.
If the version doesn’t match the one stored in etcd, the API server rejects the update.

Feb 17, 2019

[structure concurrency][Go][design] Graceful Shutdown

Recently I'm working on Actor design utilizing with Golang's channel.

Link: https://github.com/vsdmars/actor

While bumped into Martin Sústrik's blog post:
Graceful Shutdown: http://250bpm.com/blog:146
Along with the discussion: https://trio.discourse.group/t/graceful-shutdown/93
Structured concurrency resources: https://trio.discourse.group/t/structured-concurrency-resources/21

It's a nice summary, touched the well known concept of pthread cancellation point as well as Golang's context.Done()/Cancel() pattern.

Notes:
In-band cancel:
through application level channel

out-of-band cancel:
through language run-time

Sending a graceful shutdown request cannot possibly be a feature of the language. It must be done manually by the application.


POSIX C thread:
Reference:
https://stackoverflow.com/a/27374983
http://man7.org/linux/man-pages/man7/pthreads.7.html (Cancellation points)

The rules of hard cancellation are strict:
Once a coroutine has been hard-canceled the very next blocking call will immediately return ECANCELED.
In other words, every single blocking call is a cancellation point.

As an additional note, if looking at the POSIX requirement for cancel points, virtually all blocking interfaces are required to be cancel points.
Otherwise, on any completely blocked thread (in such call), there would be no safe way to terminate that thread.

Graceful shutdown can terminate the coroutine only when it is idling and waiting for a new request.

Reasoning:
If we allowed graceful shutdown to terminate the coroutine while it is trying to send the reply, it would mean that a request could go unanswered.
And that doesn't deserve to be called "graceful shutdown".
(e.g. Golang, context.Done() pattern)


Definition summarize:
Hard cancellation:
- Is triggered via an invisible communication channel created by the language runtime.
- It manifests itself inside the target coroutine as an error code (ECANCELED in libdill) or an exception (Cancelled in Trio).
- The error (or the exception) can be returned from any blocking call.
- In response to it, the coroutine is not expected to do any application-specific work. It should just exit.

Graceful shutdown:
- Is triggered via an application-specific channel.
- Manifests itself inside the target coroutine as a plain old message.
- The message may only be received at specific, application-defined points in the coroutine.
- In response to it, the coroutine can do arbitrary amount of application-specific work.

Hard cancellation is fully managed by the language.
Graceful shutdown is fully managed by the application.

Golang library reference:
go-resiliency/deadline: https://github.com/eapache/go-resiliency/tree/master/deadline