luna::
Fira
Code
font with ligatures (>=
vs =
>
, !=
vs !
=
)"I need to update an object
on 100s of millions of clients
quickly and whenever I want."
staten+1 = staten + difference(n, n+1)
difference(x,y) is a cell in a matrix of states
template <size_t N> using hash = std::array<std::byte, N>;
template <size_t N> using hash_view = std::span<const std::byte, N>;
template <size_t N> struct hash {
std::array<std::byte, N> value{};
// constructors
hash() = default;
hash(const hash &) = default;
hash(hash &&) = default;
explicit hash(std::array<std::byte, N> in) noexcept: value{in} { }
// comparison
friend operator<=>(hash, hash) = default;
friend operator==(hash, hash) = default;
// const iterable
auto begin() const noexcept {
return value.begin();
}
auto end() const noexcept {
return value.end();
}
// non-const iterable
auto begin() noexcept {
return value.begin();
}
auto end() noexcept {
return value.end();
}
// and many more...
};
template <size_t N> struct hash_view {
std::span<const std::byte, N> value;
// constructors
hash_view() = delete;
hash_view(const hash_view &) = default;
hash_view(hash_view &&) = default;
explicit hash_view(std::array<std::byte, N> in) noexcept: value{in} { }
// comparison
friend operator<=>(hash_view lhs, hash_view rhs) {
return std::ranges::lexicographical_compare_three_way(lhs, rhs);
}
friend operator==(hash_view lhs, hash_view rhs) {
return std::ranges::equal(lhs, rhs);
}
// make it iterable
auto begin() const noexcept {
return value.begin();
}
auto end() const noexcept {
return value.end();
}
// and many more...
};
// generic non-semantical hash & view
template <size_t N> struct hash : std::array<std::byte, N> {
// this is totally fine as long as you don't
// define a destructor or add a data member
using super = std::array<std::byte, N>
using super::super;
explicit(true) hash(hash_view<N> orig) {
// I can't use:
// super{orig | std::ranges::to<super>}
std::ranges::uninitialized_copy(orig, super::begin());
}
};
template <size_t N> struct hash_view : std::span<const std::byte, N> {
using super = std::span<const std::byte, N>
using super::super;
explicit(false) hash_view(const hash<N> & orig) : super{orig} { }
// thanks Tony :)
friend operator<=>(hash_view lhs, hash_view rhs) {
return std::ranges::lexicographical_compare_three_way(lhs, rhs) ;
}
friend operator==(hash_view lhs, hash_view rhs) {
return std::ranges::equal(lhs, rhs) ;
}
};
// tagged hash & view
template<typename Tag> struct tagged_hash : hash<Tag::length> {
using super = hash<Tag::length>
using super::super;
using tag = Tag;
explicit(true) tagged_hash(tagged_hash_view<N> orig) : super{orig} { }
};
template<typename Tag> struct tagged_hash_view : hash_view<Tag::length> {
using super = hash_view<Tag::length>
using super::super;
using tag = Tag;
explicit(false) tagged_hash_view(const tagged_hash<N> & orig) : super{orig} { }
};
// tags for various crypto hashes
namespace crypto {
struct sha1 {
static constexpr size_t length = 16;
};
struct sha256 {
static constexpr size_t length = 32;
};
}
// specific tagged hash & value types
using sha1 = tagged_hash<crypto::sha1>;
using sha1_view = tagged_hash<crypto::sha1>;
using sha256 = tagged_hash_view<crypto::sha256>;
using sha256_view = tagged_hash_view<crypto::sha256>;
"Strong types create strong code."
— Tony van Eerd
struct metadata {
// description of subject
luna::hash subject_hash ;
// info about the graph
luna::hash color ;
std::chrono::time_point<std::chrono::utc_clock> ts ;
// links together
std::vector<delta_link> delta_links ;
std::optional<luna::hash> snapshot_hash ;
// info about serialized state
luna::hash hash ;
std::optional<std::vector<std::byte>> bytes{std::nullopt};
};
struct delta_link {
std::chrono::time_point<std::chrono::utc_clock> previous_ts ;
luna::hash delta_hash ;
unsigned distance_to_snapshot ;
// to keep links sorted and comparable
friend auto operator<=>(const delta_link & lhs, const delta_link & rhs) = default ;
friend auto operator==(const delta_link & lhs, const delta_link & rhs) = default ;
};
struct delta {
luna::metadata previous_metadata;
std::vector<std::byte> patch;
};
struct snapshot {
std::vector<std::byte> content;
};
template <typename Tag> struct identifier {
luna::tagged_hash<Tag> hash;
luna::type type;
friend bool operator==(const identifier & id, luna::type type) {
return id.type == type;
}
friend auto operator/(std::string_view lhs, const identifier & id) {
// get URL of the object:
// "https://somewhere" / id => "https://somewhere/aabbccdd.metadata"
return concat(lhs, "/", hexdec_encode(id.hash), extension_of(id.type)) ;
}
};
enum class type {
metadata ,
snapshot ,
delta ,
alias // uh oh?
};
it's a copy of a metadata stored under a name (eg. "latest")
and not the hash of the content (as other objects)
or it can be a hash of its name (eg. hash("latest") = "eb791cf3
...")
(you can change CDN or webserver caching rules based on the file extension)
auto object = deserialize(bytes); // ?!
struct any_object : std::variant<metadata, delta, snapshot> {
using super = std::variant<metadata, delta, snapshot>;
using super::super;
// I know... but these are here for a reason
auto get_metadata(this auto && self ) noexcept {
return std::get_if<metadata>(&self) ;
}
auto get_delta(this auto && self) noexcept {
return std::get_if<delta>(&self);
}
auto get_snapshot(this auto && self) noexcept {
return std::get_if<snapshot>(&self);
}
};
message wrapper {
required type object_type: 0 ;
optional bytes metadata: 1;
optional bytes patch: 2;
optional bytes snapshot: 3 ;
optional bytes signature: 4 ;
}
auto unwrap_and_validate(luna::identifier id , std::span<const std::byte> content )
-> any_object {
// works over span, cheap to do
const auto wrap = pb::deserialize<wrapper>(content) ;
if (wrap.type == type::metadata and wrap.has_metadata() and wrap.has_signature() ) {
return validate_metadata(id , wrap.metadata() , wrap.signature() , content ) ;
} else if (wrap.type == type::delta and wrap.has_metadata() and wrap.has_patch() ) {
return validate_delta(id , wrap.metadata() , wrap.patch() ) ;
} else if (wrap.type == type::snapshot and wrap.has_snapshot() ) {
return validate_snapshot(id , wrap.snapshot() ) ;
}
throw invalid_object{} ;
}
auto validate_metadata(identifier id , std::span<const std::byte> content ,
std::span<const std::byte> signature , std::span<const std::byte> wrapper ) -> metadata {
if (id != type::metadata and id != type::alias ) {
throw asked_for_different_object_type{} ;
}
const auto hash = crypto::calculate<hash>(content) ;
// check only if we asked for metadata (not alias)
if (id == type::metadata and hash != id.hash ) {
throw not_matching_checksum_of_object{} ;
}
// if metadata are not part of delta...
if (not crypto::validate_signature(trusted_keys, hash, signature) ) {
throw invalid_signature_of_metadata{} ;
}
auto result = pb::deserialize<metadata>(content) ;
// add info about serialized state
result.hash = hash ;
result.original_bytes = *wrapper | std::ranges::to<std::vector<std::byte>> ;
return result ;
}
auto validate_delta(identifier id , std::span<const std::byte> metadata_bytes ,
std::span<const std::byte> patch_bytes ) -> delta {
if (id != type::delta ) {
throw asked_for_different_object_type{} ;
}
const auto hash = crypto::calculate<hash>(metadata_bytes , patch_bytes ) ;
if (hash != id.hash ) {
throw not_matching_checksum_of_object{} ;
}
// parse metadata and add its hash (to identify the state)
auto metadata = pb::deserialize<metadata>(metadata_bytes) ;
metadata.hash = crypto::calculate<hash>(metadata_bytes) ;
return delta {
.previous_metadata = std::move(metadata) ,
.patch = patch_bytes | std::ranges::to<std::vector<std::byte>> ;
};
}
auto validate_snapshot(identifier id , std::span<const std::byte> content ) -> snapshot {
if (id != type::snapshot ) {
throw asked_for_different_object_type{} ;
}
const auto hash = crypto::calculate<hash>(content) ;
if (hash != id.hash ) {
throw not_matching_checksum_of_object{};
}
return snapshot {content | std::ranges::to<std::vector<std::byte>> };
}
struct state {
const luna::metadata metadata ;
std::shared_ptr<const T> subject ; // can be shared
// shortcuts to access metadata
auto get_hash() const noexcept -> luna::hash;
auto get_color() const noexcept -> luna::hash;
auto get_ts() const noexcept -> std::chrono::time_point<std::chrono::utc_clock>;
};
std::shared_ptr<const T>
auto select_next(const metadata & md , std::shared_ptr<const state> & current_state )
-> luna::identifier {
// in case we switching to different graph
if (current_state and md.color != current_state.get_color() ) {
current_state = nullptr ;
}
// in case we are going back in time
if (current_state and md.ts < current_state.get_ts() ) {
current_state = nullptr ;
}
assert(md.snapshot_hash.has_value() or std::size(md.delta_links) > 0);
// without current state or any delta use snapshot if present :)
if ((current_state == nullptr or md.delta_links.empty()) and md.snapshot_hash.has_value() ) {
return luna::identifier{*md.snapshot_hash, luna::type::snapshot} ;
}
// towards current state
if (current_state ) {
const auto eligible_link = [&](const luna::metadata::delta_link & link ) {
assert(link.ts != current_state->get_ts());
return link.ts > current_state->get_ts() ;
};
return luna::identifier{
std::ranges::min(
md.delta_links | ranges::views::filter(eligible_link)
) .hash ,
luna::type::delta} ;
}
// towards closest snapshot
return luna::identifier{
std::ranges::min (
md.delta_links , std::less{} , &metadata::delta_link::distance_to_snapshot
).hash , luna::type::delta} ;
}
std::vector<luna::any_object>; // this is the way path
auto get(luna::identifier) -> luna::any_object; // ??
auto convert_to_object(identifier id ,
async_fetch<std::vector<std::byte> > auto fetch_bytes ) -> coro::task<any_object> {
co_return unwrap_and_validate(co_await fetch_bytes(id) ) ;
}
template <typename T, typename Result >
concept async_fetch = requires(T obj, identifier id) {
requires std::invocable<T, identifier> ;
{ obj(id) } -> coro::awaitable_of<Result > ;
};
auto download(identifier id) -> coro::task<std::vector<std::byte>> {
// download (or throw exception) with curl or some other library :)
}
auto fetch_path(std::shared_ptr<const state> current_state , hash_or_name target ,
async_fetch<std::vector<std::byte>> auto && fetch_object )
-> coro::task<std::vector<any_object>> {
// result path...
std::vector<any_object> path{} ;
// this is not a coroutine, just passing it thru
auto fetch_object = [&](luna::identifier id) -> coro::task<any_object> {
return luna::convert_to_object(id, fetch_blob );
};
// if we are asked to update to current state
if (current_state and current_state->get_hash() == target ) {
co_return {} ;
}
// start with downloading HEAD
path.emplace_back(co_await fetch_object(convert_to_identifier(target) ) ) ;
// and rest of the path will follow (until we joined current state or hit snapshot)
for (;;) {
const auto & last = path.back() ;
if (const auto * metadata = last.get_metadata() ) {
// if we are in current state, no need to do anything
if (current_state and metadata->hash == current_state->get_hash() ) {
co_return path ;
}
// but otherwise decide where to go next...
path.emplace_back(co_await fetch_object(select_next(*metadata, current_state) ) ) ;
} else if (const auto * delta = last.get_delta() ) {
// we are interested only in embed previous metadata, the loop will handle it...
path.emplace_back(delta->previous_metadata ) ;
} else if (last.get_snapshot() != nullptr ) {
co_return path ;
}
}
}
statem = staten +
statem = staten +
auto update_to(std::atomic<std::shared_ptr<const state>> & current_state ,
hash_or_name target , async_fetch<std::vector<std::byte>> auto && fetch_blob )
-> coro::task<luna::update_result> {
// and the update...
const auto old_state = current_state.load() ;
const auto new_state = co_await luna::fetch_path(old_state , target , fetch_blob )
| ranges::views::fold_right(old_state , apply<update_traits>{} ) ;
if (old_state != nullptr and new_state->hash() == old_state->hash() ) {
co_return luna::already_on_latest_version(old_state) ;
}
current_state.store(new_state) ;
co_return luna::updated(new_state) ;
}
template <update_traits_for<T> UpdateTraits > struct apply : UpdateTraits {
using traits = UpdateTraits ;
using sh_state = std::shared_ptr<const state> ;
auto operator()(const sh_state & , luna::snapshot & snapshot ) -> sh_state {
return std::make_shared<const state>(
this->traits::deserialize(std::move(snapshot.bytes)) , metadata{}
) ;
}
auto operator()(const sh_state & previous, luna::delta & delta ) -> sh_state {
return std::make_shared<const state>(
this->traits::apply_patch(*previous->subject , std::move(delta.patch) ) , metadata{}
) ;
}
auto operator()(const sh_state & previous, const luna::metadata & md ) -> sh_state {
if (this->traits::calculate_hash(*previous->subject) != md.subject_hash ) {
throw invalid_subject{} ;
}
return std::make_shared<const state>(previous.subject, md ) ;
}
};
struct example_update_traits {
// consistency check
auto calculate_hash (const subject_type & ) -> luna::sha256 ;
// diff & patch
auto calculate_difference (const subject_type & lhs , const subject_type & rhs )
-> std::vector<std::byte> ;
auto apply_difference (const subject_type & prev , std::span<const std::byte> diff )
-> subject_type ;
// IO support
auto serialize (const subject_type & ) -> std::vector<std::byte> ;
auto deserialize (std::span<const std::byte> ) -> subject_type ;
};
template <typename T , update_traits_for<T> UpdateTraits > class stream {
using subject = T;
using update_traits = UpdateTraits;
std::atomic<std::shared_ptr<const state<T>>> current_state{nullptr};
public:
// call me whenever you want me to update
auto update_to(
hash_or_name target, async_fetch<std::vector<std::byte>> auto && fetch_blob
) -> coro::task<update_result> ;
// value is the subject
auto get() const noexcept -> std::shared_ptr<const T> {
const auto state = current_state.load() ;
if (state == nullptr ) {
return nullptr;
}
// maybe limit the API and wrap it?
return state->subject;
};
// serialization / deserialization
auto serialize_current_state() const -> std::vector<std::byte> {
return serialize_state(current_state.load() ) ;
}
static auto serialize_state(std::shared_ptr<const state<T>> s )
-> std::vector<std::byte> {
if (s == nullptr ) {
throw no_state_available{};
}
assert(s->metadata.original_bytes.has_value());
return byte_concat(*s->metadata.original_bytes , update_traits::serialize(*s->subject ) ) ;
}
static auto deserialize_state(std::optional<std::span<const std::byte>> input )
-> std::shared_ptr<const state<T>> {
if (not input) {
return nullptr;
}
const auto [md_bytes , content ] = split_persistent_state(input);
std::vector<any_object> path {snapshot{content} , unwrap_and_validate(md_bytes ) };
return ranges::fold (
std::shared_ptr<const state<T>>{nullptr } ,
path,
apply_path<update_traits>{}
);
}
// constructors
stream() = default;
explicit stream(std::optional<std::span<const std::byte>> persistent_state ) :
current_state{deserialize_state(persistent_state ) }
{
// otherwise nothing
}
};
int main() {
luna::stream <std::map<std::string, std::string>, map_updater > simple_database{
load_file_content("config.bin") ; // returns std::optional<std::vector<std::byte>>
};
auto update = [&]()->std::task<void>{
const auto res = co_await simple_database.update_to("latest", download_using_curl) ;
if (res == luna::updated ) {
save_file_content("config.bin", simple_database.serialize_current_state()) ;
}
};
// initial update at start
coro::async_await(update()) ;
auto periodical_update = scheduler::plan_every(std::chrono::minutes{10}, [&]{
return update();
}) ;
auto do_something_with_the_value = [&]{
const std::shared_ptr handle = simple_database.get();
const T & value = *handle;
// ... your app code
};
// ...
}
// somewhere in your application:
auto always_latest_blob = luna::stream <std::vector<std::byte> , bsdiff >;
// when you need to use it:
void foo() {
std::shared_ptr<const std::vector<std::byte>> handle = always_latest_blob.get() ;
// ...
}
struct bsdiff {
using subject_type = std::vector<std::byte>;
using subject_view = std::span<const std::byte>
auto calculate_hash(subject_view content ) -> luna::sha256 {
return crypto::hasher<luna::sha256>::calculate(content) ;
}
auto calculate_difference(subject_view lhs, subject_view rhs)
-> std::vector<std::byte> {
return bsdiff_into_vector(lhs, rhs);
}
auto apply_difference(subject_view prev, std::span<const std::byte> diff)
-> subject_type {
return bspatch_into_vector(prev, diff);
}
auto serialize(subject_view content) -> std::vector<std::byte> {
return content | ranges::to<std::vector<std::byte>>;
}
auto deserialize(std::span<const std::byte> blob) -> subject_type {
return blob | ranges::to<subject_type>
}
};
"I need to update an object
on 100s of millions of clients
quickly and whenever I want."
"I need to deliver a hash
value (32 bytes) of graph's HEAD
to 100s of millions of clients
quickly and whenever I want."
// let's have a simple virtual file-system...
using vfs = std::map <std::filesystem::path , std::vector<std::byte>> ;
// and update_traits to support it...
struct vfs_traits {
using subject = vfs;
using subject_hash = luna::sha256;
// algorithm for hash calculation of the subject
auto calculate_hash(const subject & vfs) const -> subject_hash {
crypto::hasher<subject_hash> hash{};
for (const auto & [name , content ]: vfs) {
hash.update (crypto::hasher<subject_hash>::calculate(name) );
hash.update (crypto::hasher<subject_hash>::calculate(content) );
}
// it's just a hash of hashes
return hash.final() ;
}
// delta support
auto calculate_difference(const subject & lhs, const subject & rhs) const
-> std::vector<std::byte> {
luna::vfs_patch result ;
for (auto && change : luna::difference_range(lhs, rhs) ) {
if (change.is_addition() ) {
result.add_addition (change.key , change.value );
} else if (change.is_removal() ) {
result.add_removal (change.key );
} else {
assert(change.is_update()) ;
result.add_change (change.key ,
bsdiff::calculate_difference(change.previous_value , change.value ) );
}
}
return bzip::compress(wire::serialize(result) ) ;
}
auto apply_difference(const subject & old, std::span<const std::byte> patch) const
-> subject {
const auto changes = wire::deserialize<luna::vfs_patch>(bzip::decompress(patch) ) ;
if (not changes.has_value()) {
throw invalid_patch{};
}
subject result = auto(old);
for (const auto & change: changes) {
if (change.is_addition() ) {
result.insert (change.key() , change.value() );
} else if (change.is_removal() ) {
result.erase (change.key() );
} else if (change.is_update() ) {
if (auto it = result.find(change.key() ) ; it != result.end() ) {
it->second = bsdiff::apply_difference (it->second , change.diff() );
} else {
throw expected_key_is_missing{};
}
} else {
throw broken_diff{}
}
}
return result;
}
// subject processing (serialize/deserialize)
auto serialize(const subject & vfs) const -> std::vector<std::byte> {
return bzip::compress(wire::serialize(vfs) ) ;
}
auto deserialize(std::span<const std::byte> archive) const -> subject {
return wire::deserialize<luna::vfs>(bzip::decompress(archive) ) ;
}
}
// And we got a simple VCS... :)
using simple_git = luna::stream<vfs, vfs_traits>;