Jul 29, 2025

[C++] Coroutine examples

  1. C++ coroutine has two scopes: caller scope, coroutine scope (if you familiar with Python yield/send, same concept but more subtle with object lifetime and more flexible. Or Golang goroutine/channel [stackful], or C++ fiber[stackful])
  2. Suspend suspends the coroutine and back to the caller, caller uses handler to resume the coroutine
  3. Underneath using JMP instead of CALL since it's stackless coroutine.
  4. Beware that even `void coroutine(const int& arg)` that has co_await/co_yield/co_return, when it suspends, the binding arg has been stack rewinded thus destroyed.
    i.e.
    If a coroutine has a parameter passed by reference, resuming the coroutine after the lifetime of the entity referred to by that parameter has ended is likely to result in undefined behavior.

    The C++ core guidelines say not to use references at all.
  5. Just avoid references" isn't comprehensive, e.g. we have span, string_view, even though they are not reference, can still facing the same dangling issue after co_await/co_yield/co_return.
    Thus, a better contract is:
    Pass by Value with Owning Types.
  6. The way we ensure caller doesn't compile is by making Co non-moveable, and accepting it by value in the co_await implementation in the promise type.

    This means the only way to await it is if you do so immediately in the same full expression as the function call, so that guaranteed copy elision can kick in. 

    Because temporaries aren't destroyed until that full expression has been evaluated, the lifetimes work out perfectly using the usual language rules around lifetimes.

    Design TIP:
    Eliminated the problem with references by just declaring an entire pattern of code illegal.
      Co<void> UseInt_Async(const int& x) {
         printf("%d\n", x);
         co_await DoSomething();
         printf("%d\n", x);
      }
      
      k3::Co<void> UseInt_Async(const int&);
      // Works fine
      co_await UseInt_Async(17);
        
      // Compiler error due to k3::Co<void> is not moveable.
      k3::Co<void> co = UseInt_Async(17);
      co_await std::move(c);
  7. What if we want to use above pattern?
    Indirection, just like std::bind
    k3::Co<void> UseInt_Async(const int&);
    // Totally safe
    k3::Future<void> future{UseInt_Async, 17};
    printf("I created the future!\n");
    co_await std::move(future).Run();
  8. Co<void> has the interface concept of awaitable, as below.
  9.  Fan out or run in sequence; the implementation is imaginable.
      
    // Run all concurrently, finishing once all finish.
    k3::Co<void> FanOut(std::vector<k3::Future<void>> futures);
    // Run all concurrently, finishing when the first finishes.
    template <typename T>
    k3::Co<T> Race(std::vector<k3::Future<T>> futures);
    

  10. co_await awaitable;
    struct awaitable {
    	bool await_ready() { return false; // false: suspended, true: not suspended}
    	void await_suspend(std::coroutine_handle<> h) { // what to do when suspended}
    	void await_resume() { // in await_suspend's coroutine_handle calling .resume() comes here. and after this resume to coroutine.  }
        int await_resume() {return 42; // co_await returns value from here.}
    }; 
    co_yield
    co_return

Stack:

Go goroutines has dynamic stack size, default to 2kb. linux has thread size default to 8mb, 64k in production.
So in coroutine, how do we avoid stack overflow?

tail call comes to the rescue;
c++ coroutine guarantees a tail call into the other coroutine, and then you can do it again on the way back out. There's no need to have a stack frame to return to; everything is a JUMP instruction.
This is the only guaranteed[as of c++20] tail call in the standard; it's actually kind of a unique mixing of abstraction levels.
Co<void> Foo();
Co<void> Bar() {
  // Tail call into the body of Foo.
  co_await Foo();
  // Tail call back from the body of Foo.
  [...]
}
This indicates that with coroutine embedded inside a coroutine all use the same stack; thus
if there are multiple suspension, the stack can be overflown.

c9 solution:
we are never allowed to have one coroutine directly resume another.

Possible solution to the Notify/Wait pattern:
class Event {
Co<void> Wait();
// If there is a waiter, it will start running concurrently
// on another thread. The calling thread continues on.
// Look at the co_wait example below.
void Notify();
};
We could resolve this by breaking the assumption that there is only one thread available.
Instead we could resume the coroutine on a different thread, letting it run concurrently at the same time as the notifier.


Avoid executors in the coroutine library design:

Don't offer unnecessary configurability.
library should be agnostic to executors. All it has is its one thread-local queue of things that have been resumed by the running coroutine.
Of course individual things you wait for might have an executor internally. Like if you wait for an RPC to finish, you'll probably be resumed on one of a team of threads reading RPC replies from the network.
e.g.


// Hop to a specific executor. We need to run there because…
// look the co_await example below~
co_await Reschedule(my_executor);

Cancellation in library design

  • RPCs to stuck machines
  • Request hedging (idempotent operations)
  • Avoiding wasted work
  • Timeouts

The callee exists only to serve the caller. It must stop promptly if the caller loses interest.

  • the caller of a coroutine is always in control.
  • If the caller no longer wants the callee to run, the callee should stop running.
  • And it should do so promptly. Not after getting a response to its RPC. Not after a timeout expires. Immediately.
    All it should get to do is the kind of thing you want to do before you release a lock or unwind after an exception. Ensure internal invariants are restored; that kind of thing.
  • coroutine's local automatic variable won't be destructed until it is resumed till the end of the coroutine. (while is is on the heap)
  • If cancellation happens (which is to resume after coroutine suspended, and pass the cancellation bit to the coroutine_handler which be consumed by the awaitable, and the awaitable resume checks the bit to cancel the original code path.), the coroutine continues till the end.

Notes on structured concurrency; idea:
Children should not out-live parent.
Prefer structured concurrency wherever possible.
Good API design also helps with usability and safety.

Handler:

// task, aka handler, will be created through promise_type instance's get_return_object()
// and destructed once the coroutine function is returned to the caller.
// the destruct of task does not mean the promise_type instance is destructed, it still resides on the heap where
// coroutine is located.
struct task
{
    struct promise_type
    {
        task get_return_object() { return {}; // Always called first when coroutine being called. }
        std::suspend_never initial_suspend() { return {}; // Always called second when coroutine being called.   }
        std::suspend_never final_suspend() noexcept { return {}; / Always called last when coroutine is finished. (i.e. done)  }
        void return_void() { // called if return from coroutine with void.}
        std::suspend_always yield_value(T value) noexcept
        {
          // used for co_yeild, once co_yeild is called, the yield value is installed here.
          // and suspend afterwards due to return `std::suspend_always`
          return {};
        }
        
        std::suspend_always return_value(T value) {
          // used for co_return, once co_return is called with expr, the expr value is installed here.
          // and suspend afterwards due to return `std::suspend_always`
          return {};
        }
        void unhandled_exception() {}
    };
    
    task(std::coroutin_handler<promise_type> h) {
      // store the coroutin_handler inside the task.
    }
};

#include <iostream>
#include <coroutine>

// 1. The Promise Type
struct MyTaskPromise {
    // The compiler calls this to get the return object.
    // We return a 'MyTask' object, and in its constructor, we pass it the handle.
    struct MyTask get_return_object();

    std::suspend_never initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }
    void return_void() noexcept {}
    void unhandled_exception() noexcept {}
};

// 2. The Task Type
// This is the wrapper around the coroutine_handle.
struct MyTask {
    using promise_type = MyTaskPromise;
    std::coroutine_handle<MyTaskPromise> handle;

    // The constructor takes the handle from the promise's get_return_object() call.
    MyTask(std::coroutine_handle<MyTaskPromise> h) : handle(h) {}
};

// Now we can define get_return_object() because MyTask is defined.
MyTask MyTaskPromise::get_return_object() {
    // This is the key line: we construct the MyTask object with the handle
    // to the coroutine that owns this promise.
    return MyTask{std::coroutine_handle<MyTaskPromise>::from_promise(*this)};
}

// 3. A simple awaitable to demonstrate suspension.
struct Awaitable {
    bool await_ready() { return false; }
    void await_resume() {}

    // This await_suspend returns `true`, which suspends the coroutine
    // and returns control to the caller (main).
    bool await_suspend(std::coroutine_handle<>) noexcept {
        std::cout << "-> Awaitable: Coroutine is suspending." << std::endl;
        return true;
    }
};

// 4. The Coroutine Function
MyTask MyCoroutine() {
    std::cout << "Coroutine: Starting." << std::endl;
    co_await Awaitable{}; // Coroutine suspends here.
    std::cout << "Coroutine: Resumed and finishing." << std::endl;
    co_return;
}

// 5. The Caller (main function)
int main() {
    // 1. The call to MyCoroutine() returns a 'MyTask' object.
    // This object contains the handle to the suspended coroutine.
    MyTask task = MyCoroutine();

    std::cout << "\nMain: Coroutine is suspended. I am the caller.\n" << std::endl;

    // 2. We can now use the handle stored inside the 'task' object
    // to resume the coroutine.
    if (task.handle) {
        std::cout << "Main: Resuming the suspended coroutine." << std::endl;
        task.handle.resume();
    }

    std::cout << "\nMain: Coroutine has finished its execution." << std::endl;

    // 3. Clean up the coroutine's memory.
    if (task.handle) {
        task.handle.destroy();
    }

    return 0;
}



co_yield Example:

#include <coroutine>
#include <iostream>
#include <optional>

template<std::movable T>
class Generator
{
public:
    struct promise_type
    {
        Generator<T> get_return_object()
        {
          std::cout << "get_return_object()\n"; // -2; Generator created with pointer to the heap
          return Generator{Handle::from_promise(*this)};
        }
        static std::suspend_always initial_suspend() noexcept
        {
          std::cout << "suspend_always initial_suspend()\n"; // -4 init. suspend. Go to the caller.
          return {};
        }
        static std::suspend_always final_suspend() noexcept
        {
          std::cout << "suspend_always final_suspend()\n"; // -15 coroutine ends, call this and suspend.
          // Back to the caller. Go to (13)
          return {};
        }
        std::suspend_always yield_value(T value) noexcept
        {
          std::cout << "yield_value() : " << value << "\n"; // -9, suspend. Go to the caller.
          current_value = std::move(value);
          return {};
        }
        // Disallow co_await in generator coroutines.
        void await_transform() = delete;
        [[noreturn]]
        static void unhandled_exception() { throw; }

        std::optional<T> current_value;
    };

    using Handle = std::coroutine_handle<promise_type>;

    explicit Generator(const Handle coroutine) :
        m_coroutine{coroutine}
    {
      std::cout << "Generator constructor\n"; // -3
    }

    Generator() = default;
    ~Generator()
    {
        // make sure no double free through handler.destroy() while
        // caller could have a copy of the handler.
        if (m_coroutine && !m_coroutine.done())
            m_coroutine.destroy();
      std::cout << "Generator destructor\n";
    }

    Generator(const Generator&) = delete;
    Generator& operator=(const Generator&) = delete;

    Generator(Generator&& other) noexcept :
        m_coroutine{other.m_coroutine}
    {
      std::cout << "Generator move constructor\n";
      other.m_coroutine = {};
    }
    Generator& operator=(Generator&& other) noexcept
    {
      std::cout << "Generator assign operator=\n";
        if (this != &other)
        {
            if (m_coroutine)
                m_coroutine.destroy();
            m_coroutine = other.m_coroutine;
            other.m_coroutine = {};
        }
        return *this;
    }

    // Range-based for loop support.
    class Iter
    {
    public:
        void operator++()
        {
          std::cout << "Iter ++ currnet value: " << *m_coroutine.promise().current_value << "\n";
          
          m_coroutine.resume(); // -11, resume from (9) yeild's suspend; JUMP to COROUTIN RIGHT AWAY!
          // Following cout is not run until coroutin suspend again.
          
          // -13, followed by (12) and after (9) yeild's suspend.
          std::cout << "Iter ++ resumed currnet value: " << *m_coroutine.promise().current_value << "\n";
        }
        const T& operator*() const
        {
          // -10, caller print out the value.
          std::cout << " Iter* return value: " << *m_coroutine.promise().current_value << "\n";
          return *m_coroutine.promise().current_value;
        }
        bool operator==(std::default_sentinel_t) const
        {
          // -16, caller is calling this from coroutin's suspend.
          std::cout << "Iter == called: !m_coroutine: " << (!m_coroutine) << " m_coroutine.done(): " << m_coroutine.done() << "\n";
            return !m_coroutine || m_coroutine.done();
        }

        explicit Iter(const Handle coroutine) :
            m_coroutine{coroutine}
        {}

    private:
        Handle m_coroutine;
    };

    Iter begin()
    {
      std::cout << "Iter begin\n"; // -5, caller range for calls begin()
        if (m_coroutine)
            m_coroutine.resume(); // -6 (4) suspended resumed. Go to coroutine.
        return Iter{m_coroutine};
    }

    std::default_sentinel_t end() { return {}; }

private:
    Handle m_coroutine;
};

template<std::integral T>
Generator<T> range(T first, const T last)
{
  // suspended right away since initial_suspend() returns std::suspend_always
  // returns to the caller with Generator instance
  std::cout << "Range\n"; // -7 from (6)
  while (first < last){
    std::cout << "Range first: " << first << "\n";
    co_yield first++; // -8, go to yield_value(), install `first` value, first is + 1, and suspend.
    // // -12, resume from (11) operator++()
    std::cout << "Range after first++: " << first << "\n";
  }
  // -14, coroutine end, suspend_always final_suspend() called.
}

int main()
{
  std::cout << "Start for loop\n"; // -1
  // Generator is destructed only once due to range loop extends lifetime.  
  for (const char i : range(65, 67))
      std::cout << i << "\n";
  // -17, out of range for loop scope, 
  // range(65, 67) returned `Generator` destructs.

  std::cout << "End for loop\n";
    std::cout << '\n';
}
stdout:
Start for loop
get_return_object()
Generator constructor
suspend_always initial_suspend()
Iter begin
Range
Range first: 65
yield_value() : 65
Iter == called: !m_coroutine: 0 m_coroutine.done(): 0
 Iter* return value: 65
A
Iter ++ currnet value: 65
Range after first++: 66
Range first: 66
yield_value() : 66
Iter ++ resumed currnet value: 66
Iter == called: !m_coroutine: 0 m_coroutine.done(): 0
 Iter* return value: 66
B
Iter ++ currnet value: 66
Range after first++: 67
suspend_always final_suspend()
Iter ++ resumed currnet value: 66
Iter == called: !m_coroutine: 0 m_coroutine.done(): 1
Generator destructor
End for loop


co_await Example:

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
 
auto switch_to_new_thread(std::jthread& out) {
    std::cout << "switch_to_new_thread start\n";
    struct awaitable {
        std::jthread* p_out;
        bool await_ready() {
		  std::cout << "await ready\n";
		  return false; 
	    };
    
    	void await_suspend(std::coroutine_handle<> h) {
	    	std::cout << "await_suspend\n";
	        std::jthread& out = *p_out;
    	    if (out.joinable())
        	  throw std::runtime_error("Output jthread parameter not empty");
	        out = std::jthread([h] { 
        		std::cout << "calling handler.resume()\n";
				h.resume();
    	    });
            
        // Potential undefined behavior: accessing potentially destroyed *this
        // std::cout << "New thread ID: " << p_out->get_id() << '\n';
	        std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
        }
    
    	void await_resume() {
	    	std::cout << "await_resume\n";
	    }
    };

    std::cout << "switch_to_new_thread about to return\n";
    
    return awaitable{&out};
}
 
struct task {
    struct promise_type {
        task get_return_object() {
			std::cout << "get_return_object()\n";
			return {}; 
		}
    
    	std::suspend_never initial_suspend() {
			std::cout << "inital_suspend()\n";
			return {}; 
		}
    
	    std::suspend_never final_suspend() noexcept {
			std::cout << "final_suspend()\n";
			return {}; 
		}
    
    	void return_void() {
			std::cout << "return_void\n";
		}
     
    	 void unhandled_exception() {}
    };

    ~task() {
	    std::cout << "task destruct\n";
    }
};
 
task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
 
int main() {
    std::jthread out;
    std::cout << "start\n";
    resuming_on_new_thread(out);
    std::cout << "ending main()\n";
}
stdout:
start
get_return_object()
inital_suspend()
Coroutine started on thread: 140248882116480
switch_to_new_thread start
switch_to_new_thread about to return
await ready
await_suspend
New thread ID: 140248877143744
task destruct
ending main()
calling handler.resume()
await_resume
Coroutine resumed on thread: 140248877143744
return_void
final_suspend()

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.