What we’ve been
(a)waiting for?

Hana Dusíková

Hana

  • staff software engineer at Woven by Toyota
  • co-chair of Evolution Working Group in ISO C++ committee
  • chair of Study Group for Compile Time Programming

Hana

  • CTRE (Compile Time Regular Expressions)
  • Lightning Updates (distributed push update system)
  • CTHASH (Compile Time SHA-2 & SHA-3 implementation)
  • co_CURL

I love JavaScript!

fetch in JavaScript


							const result = await fetch("https://hanicka.net/");
							
						

Promise.all() in JavaScript


							async function download_both(prefix) {
								const things = [
									fetch(prefix+"/a.txt"),
									fetch(prefix+"/b.txt")
								];
								return Promise.all(things);
							}
						

libcurl

libcurl

  • 27 year old and proven library
  • nice and simple C API
  • many supported protocols

simple libcurl example


							// initialization
							CURL * handle = curl_easy_init();
							
							if (handle == NULL) {
								return 1;
							}
							
							// usage
							curl_easy_setopt(handle, CURLOPT_URL, "https://curl.se");
							curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
							
							CURLcode res = curl_easy_perform(curl);
							
							if (res != CURLE_OK) {
								printf("curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
							}
							
							// free all resources used
							curl_easy_cleanup(handle);
						

writing result into an object


							std::string output_string{};
							
							curl_easy_setopt(handle, CURLOPT_WRITEDATA, &output_string);
							
							auto writer = +[](char * ptr, size_t, size_t nmemb, void * udata) -> size_t {
								auto & output = *static_cast<std::string *>(udata);
								output.append(std::string_view(ptr, nmemb));
								
								return nmemb;
							};
							
							curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, writer);
							
							const auto res = curl_easy_perform(handle);
							
							if (res != CURLE_OK) {
								throw std::runtime_exception{curl_easy_strerror(res)};
							}
							
							return output_string;
						

co_curl: RAII wrapper of CURL


							namespace co_curl {
								struct easy_handle {
									easy_handle();
									explicit easy_handle(std::string_view url);
									~easy_handle();
								
									// options:
									void url(std::string_view);
									void follow_location(bool = true);
									void write_into(AppendableContainer auto & output);
									// ...
									
									result sync_perform();
									// ...
								};
							}
						

RAII example with co_curl


							auto fetch(std::string_view url) -> std::string {
								auto handle = co_curl::easy_handle{url};
							
								handle.follow_location();
							
								std::string output_string{};
								handle.write_into(output_string);
							
								// will block here until the request is finished
								const auto res = handle.sync_perform();
							
								if (!res) {
									throw std::runtime_error{res.string()};
								}
								
								return output_string;
							}
						

Asynchronous RAII example with co_curl


							auto fetch(std::string_view url) -> co_curl::promise<std::string> {
								auto handle = co_curl::easy_handle{url};
							
								handle.follow_location();
							
								std::string output_string{};
								handle.write_into(output_string);
							
								// will wait for the request to finish
								const auto res = co_await handle.perform();
							
								if (!res) {
									throw std::runtime_error{res.string()};
								}
								
								co_return output_string;
							}
						

scary coroutines

why scary?

  • standard library doesn't support coroutines yet
  • it's too complicated
  • it's too new, I will wait...

functions are just specialised coroutines

functions

  • have one starting / entry point
  • can have multiple exit points which always jumps back to caller
  • behave as a scope: lifetime of all objects is easy to reason about

coroutines

  • have one starting and multiple entry points
  • can be suspended and resumed
  • can have multiple exit points
  • lifetime of a coroutine and its objects are defined by its library

coroutines != threads

a javascript function


									// returns result value
									function divide(a, b) {
										if (b == 0) {
											throw "division by zero";
										}
										return a / b;
									}
								

a coroutine doing same thing


									// returns a promise of result value
									async function divide(a, b) {
										if (b == 0) {
											throw "division by zero";
										}
										
										return a / b;
									}
								

a c++ function


									int divide(int a, int b) {
										if (b == 0) {
											throw invalid_argument{"div be zero"};
										}
										return a / b;
									}
								

a coroutine doing same thing


									function<int> divide(int a, int b) {
										if (b == 0) {
											throw invalid_argument{"div be zero"};
										}
										co_return a / b;
									}
								

C++ coroutines

Must use at least once co_await, co_yield,
or co_return keywords inside its definition.

A coroutine must have defined
std::coroutine_traits<return-type, arguments...> for its return type
and arguments.

default std::coroutine_traits


							template <typename T, typename... Args> struct std::coroutine_traits<T, Args...> 
							requires requires { typename T::promise_type; }
							{
								using promise_type = typename T::promise_type;
							}
						

a customization point to select promise_type
implementing coroutine behaviour

model of a coroutine

coroutine handle


							template <typename Promise = void> struct std::coroutine_handle {
								void destroy();
								void resume();
								
								// if coroutine was finished
								bool done();
								explicit operator bool();
								
								// only if Promise is not void
								Promise & promise() const;
								static coroutine_handle from_promise(Promise &);
								
								// useful for interacting with C api
								void * address() const;
								static coroutine_handle from_address(void *);
							};
						

function type


							template <typename T> struct function {
								struct promise_type;
								
								using handle_type = std::coroutine_handle<promise_type>;
								handle_type handle;
								
								function(handle_type h): handle{h} { }
								
								~function() {
									// destroy the coroutines once this owning object is destroyed
									handle.destroy();
								}
								
								T get();
								explicit operator T();
							};
						

resume of a coroutine


							[..., resume_point = &state1] {
								goto *resume_point;
							state1:
								
								// suspend point
								resume_point = &state2;
								return; 
							state2:
								
								// suspend point
								resume_point = &state3;
								return; 
							state3:
							}
						
each suspend point is marked co_await (or co_yield)
and it can be part of an expression

co_await awaitable


							if (!awaitable.await_ready()) {
								resume_point = &after;
								
								if constexpr (await_suspend returns void) {
									awaitable.await_suspend(current-coroutine-handle);
									return;
								}
								else if constexpr (await_suspend returns bool) {
									if (awaitable.await_suspend(current-coroutine-handle)) {
										return; // go back to caller or resumer
									}
								}
								else if constexpr (await_suspend returns coroutine-handle) {
									// it's called "symmetric transfer"
									goto *awaitable.await_suspend(current-coroutine-handle);
								}
							}
							
							after:
							/* return */ awaitable.await_resume();
							
						

existing standard awaitables


							namespace std {
							
								// will always suspend and go back to its caller/resumer
								struct suspend_always {
									constexpr bool await_ready() const noexcept { return false; }
									constexpr void await_suspend(std::coroutine_handle<>) const noexcept { }
									constexpr void await_resume() const noexcept { }
								}
						
								// will never suspend and will just continue
								struct suspend_never {
									constexpr bool await_ready() const noexcept { return true; }
									constexpr void await_suspend(std::coroutine_handle<>) const noexcept { }
									constexpr void await_resume() const noexcept { }
								}
							}
						

(don't do this at home)


							struct run_in_different_thread {
								constexpr bool await_ready() const noexcept { return false; }
								constexpr void await_suspend(std::coroutine_handle<> coro) const {
									std::thread{[coro]{ coro.resume() }}.detach();
								}
								constexpr void await_resume() const noexcept { }
							}
							
							co_await run_in_different_thread(); // 🤯
						

							template <typename T> struct function {
								struct promise_type {
									// implicit suspend points behaviour
									auto initial_suspend() const noexcept { return std::suspend_never{}; }
									auto final_suspend() const noexcept { return std::suspend_always{}; }
									
									// constructing of object referencing coroutine
									auto get_return_object() noexcept {
										return function{std::coroutine_handle<promise_type>::from_promise(*this)};
									}
									
									// it's a slideware!
									T result;
									std::exception_ptr exception;
									
									// called with co_return something
									void return_value(T value) {
										result = value;
									}
									
									// when there is an exception
									void unhandled_exception() {
										exception = std::current_exception();
									}
								};
								
								using handle_type = std::coroutine_handle<promise_type>;
								handle_type handle;
								
								function(handle_type h): handle{h} { }
								
								~function() {
									// this should behave as a function
									handle.destroy();
								}
								
								T get() {
									assert(handle.done());
									
									if (auto eptr = handle.promise().exception) {
										std::rethrow_exception(eptr);
									}
									
									return handle.promise().result;
								}
								
								explicit operator T() {
									return get();
								}
							};
						

							function<int> divide(int a, int b) {
								// pseudo-code!!
								return function<int>([a=a, b=b] coroutine-frame {
									co_await promise.initial_suspend(); // function will never suspend
								
									try {
										if (b == 0) {
											throw std::invalid_argument{"division by zero"};
										}
									
										// co_return a/b;
										promise.return_value(a / b);
										goto finish;
									
									} catch (...) {
										promise.unhandled_exception();
									}
								
								finish:
									co_await promise.final_suspend(); // function will always suspend
								
									// destruction of coroutine-frame (will never reach)
								});
							}
						

co_curl::promise<T>


							template <typename R> struct promise {
								struct promise_type;
								using handle_type = std::coroutine_handle<promise_type>;
								
								promise(handle_type h): handle{h} { }
								promise(const promise &) = delete;
								promise(promise && other) noexcept: handle{std::exchange(other.handle, nullptr)} { }
								~promise() { if (handle) { handle.destroy(); } }
								
								promise & operator=(promise && other) noexcept {
									std::swap(handle, other.handle);
									return *this;
								}
								
								// promise<T> is also awaitable...
								bool await_ready() const noexcept {
									return handle.done();
								}
								
								auto await_suspend(std::coroutine_handle<> awaiter) {
									return handle.promise().someone_is_waiting_for_me(awaiter);
								}
								
								T await_resume() {
									if (auto eptr = handle.promise().exception) {
										std::rethrow_exception(eptr);
									}
									
									return handle.promise().get_result();
								}
							};
						

							template <typename R> struct promise::promise_type: function<R>::promise_type {
								std::coroutine_handle<> awaiter{};
								
								auto initial_suspend() noexcept {
									scheduler.inform_about_start();
									return std::suspend_never();
								}
								
								auto final_suspend() noexcept {
									struct custom_final_suspend {
										promise_type & promise;
										
										constexpr bool await_ready() const noexcept { return false; }
										constexpr void await_resume() const noexcept { }
										auto await_suspend(std::coroutine_handle<>) noexcept {
											return scheduler.final_suspend_and_select_next(promise.awaiter);
										}
									};
									
									return custom_final_suspend{*this};
								}
								
								auto someone_is_waiting_for_me(std::coroutine_handle<> other) noexcept {
									awaiter = other; // remember whom to wake up later
									return scheduler.suspend_and_select_next();
								}
							};
						

							// auto r = co_await handle.perform();
							
							struct co_curl::lazy_perform { // result of co_curl::easy_handle::perform()
								co_curl::easy_handle & curl_handle;
								
								constexpr bool await_ready() const noexcept {
									return false; // will always suspend
								}
								
								auto await_suspend(std::coroutine_handle<> awaiter) {
									curl_handle.associate_coroutine(awaiter);
									
									// will mark current coroutine blocked and will run something else
									return scheduler.suspend_and_select_next();
								}
								
								auto await_resume() -> co_curl::result;
							};
						

							struct scheduler_type {
								unsigned running_tasks{0};
								unsigned blocked_tasks{0};
								
								void inform_about_start() {
									++running_tasks;
								}
								
								auto final_suspend_and_select_next(std::coroutine_handle<> possible_next = nullptr) -> std::coroutine_handle<> {
									--running_tasks;
									return select_next(possible_next);
								}
								
								auto suspend_and_select_next() -> std::coroutine_handle<> {
									++blocked_tasks;
									return select_next();
								}
								
								auto select_next(std::coroutine_handle<> possible_next = nullptr)  -> std::coroutine_handle<> {
									if (possible_next) { // (direct awaiter in final_suspend)
										--blocked_tasks;
										return possible_next;
										
									}
									
									if (running_tasks != blocked_tasks) {
										// when whole call-tree is not blocked yet, backtrack and traverse until blocked
										return std::noop_coroutine(); 
										
									}
									
									std::coroutine_handle<> completed = complete_io_and_return_coroutine();
									--blocked;
									return completed;
								}
								
								auto complete_io_and_return_coroutine() -> std::coroutine_handle<>;
							};
						

libcurl: multi API

libcurl's multi api


							CURLMcode curl_multi_add_handle(CURLM * multi_handle, CURL * easy_handle);
							CURLMcode curl_multi_remove_handle(CURLM * multi_handle, CURL * easy_handle);
							
							CURLMcode curl_multi_perform(CURLM * multi_handle, int * running_handles);
							
							CURLMcode curl_multi_poll(CURLM * multi_handle,
							                          struct curl_waitfd extra_fds[],
							                          unsigned int extra_nfds,
							                          int timeout_ms,
							                          int * numfds);
							
							CURLMsg * curl_multi_info_read(CURLM *multi_handle, int *msgs_in_queue);
						

finishing scheduler


							// multicurl is a member of scheduler
							
							auto complete_io_and_return_coroutine() {
								for (;;) {
									// do all I/O (communication, handshakes)
									multi_curl.perform();
									
									// return first finished coroutine
									if (auto raw_handle = multi_curl.get_finished()) {
										return get_associated_coroutine_from(raw_handle);
									}
								
									// wait until something happen (timeout, response)
									multi_curl.poll();
								} // repeat forever
							} 
						

JavaScript's fetch


							auto fetch(std::string url, int attempts = 10) -> co_curl::promise<std::string> {
								std::string output{};
								
								auto handle = co_curl::easy_handle{url};
								handle.write_into(output);
								handle.follow_location();
								
								// try multiple times...
								for (;;) {
									if (attempts-- <= 0) {
										throw no_attempts_left(url);
									}
									
									if (!co_await handle.perform()) {
										handle.resume(output.size()); // keep what was downloaded
										continue; // try again
									}
									
									if (handle.get_response_code() != co_curl::http_2XX) {
										throw url_not_available(url);
									}
									
									co_return output;
								} 
							}
						

utilities

wait for all promises...


						template <range_of_promises R> 
						auto all(R && promises) -> promise<std::vector<range_promise_results<R>>> {
							using value_type = std::ranges::range_value_t<R>
							using result_type = result_of_promise<value_type>;
							
							// in case we have view we need to keep lifetime of the promises
							auto temporary = promises | std::ranges::to<std::vector>;
						
							// but results will go to other vector
							std::vector<result_type> output{};
							output.reserve(temporary.size());
						
							// go thru and co_await
							for (auto && p: temporary) {
								output.emplace_back(co_await std::move(p));
							}
						
							co_return output;
						}
						

wait for all promises... (std::optional<T>)


						template <range_of_promises_with_optional R> 
						auto all(R && promises) -> promise<std::optional<std::vector<range_promise_results<R>>>> {
							using value_type = std::ranges::range_value_t<R>
							using result_type = result_of_promise_with_optional<value_type>;
							
							// in case we have view we need to keep lifetime of the promises
							auto temporary = promises | std::ranges::to<std::vector>;
						
							// but results will go to other vector
							std::vector<result_type> output{};
							output.reserve(temporary.size());
						
							// go thru and co_await
							for (auto && p: temporary) {
								auto r = co_await std::move(p);
								
								if (!r) co_return std::nullopt;
								
								output.emplace_back(std::move(r));
							}
						
							co_return output;
						}
						

std::ranges interopt


							struct co_curl::all_tag {
								template <range_of_promises R> 
								friend auto operator|(R && promises, co_curl::all_tag) ->
									promise<std::vector<range_promise_results<R>>>;
								
								template <range_of_promises_with_optional R> 
								friend auto operator|(R && promises, co_curl::all_tag) ->
									promise<std::optional<std::vector<range_promise_results<R>>>>;
							};
							
							constexpr auto co_curl::all = all_tag{};
						

everything together

download referenced files from an index


							struct resource {
								std::string url;
								std::string content;
							};
							
							auto fetch_resource(const std::string & url) -> promise<resource> {
								co_return resource{.url = url, .content = co_await fetch(url)};
							}
							
							auto fetch_all(const std::string & url) -> promise<std::vector<resource>> {
								co_return co_await (
									co_await fetch(url) // download a webpage
									| ctre::range<R"(https?://[^"'\s]++)"> // extract links
									| std::views::transform(fetch_resource) // download links
									| co_curl::all // make sure everything is done
								);
							}
						

							auto fetch_ngram(std::tuple<size_t, std::array<char, 3>> arg) -> promise<shard> {
								auto && [offset, ngram] = arg;
								
								const auto url = std::format("{}/index/{}.json", url_prefix, as_hexdec(ngram));
								
								auto json = parse_json_with_document_and_offset(co_await fetch(url, ngram));
								
								if (!json) {
									throw invalid_json_format{url};
								}
								
								auto shift = [offset](document_and_offset v) { v.offset += offset; return v; };
								
								co_return std::move(json) | std::ranges::transform(std::move(shift));
							}
						

reverse index search with remote index shards


							auto search(std::string text, size_t top = 100) -> promise<documents_with_count> {
								// view text as ngrams with offsets...
								const auto ngrams = text | std::views::adjacent_transform<3> | std::views::enumerate;
								
								// download all necessary files and parse them...
								auto shards = co_await all(ngrams | std::views::transform(fetch_ngram_and_shift));
								
								// calculate intersection between ngram sets (index shards, sorted by sizes)...
								std::ranges::sort(shards, std::less{}, std::ranges::size);
								auto hits = std::ranges::fold_left_first(shards, index_intersection);
								
								// group them by document's id (already presorted) and calculate number of hits per document...
								auto documents = std::ranges::fold_left(hits, documents_with_count{}, reduce_sorted_by_id);
								
								// sort documents in descending order by number of hits and preserve only n-top matches...
								const auto n = std::min(documents.size(), top);
								std::ranges::partial_sort(documents, documents.begin() + n, std::greater{}, by_count);
								documents.resize(n);
								
								// return result
								co_return documents;
							}
						

hello world


							auto [a, b] = co_await co_curl::all(
								fetch("https://hanicka.net/a.txt"),
								fetch("https://hanicka.net/b.txt")
							);
							
							std::print("{} {}", a, b); 
						

What we’ve been
(co_a)waiting for?

github.com/hanickadot/co_curl
hanickadot.github.io/search