Let’s build something Outrageous – Part 14: Command Logging

You may have noticed that homes are selling for ridiculously high prices lately. Some are calling it inflation, others see it as a supply chain squeeze the likes of which we’ve never seen before. I’m in the latter camp and I think things will eventually return to normal levels but the “inflationistas” yearn for a simpler time. A return to the Gold Standard of currency which kept inflation in check. Do you know who else wanted to return to the gold standard?

Cobra Commander.

That’s right, the “bad guy” from the G.I. Joe series wanted to burn all paper currency and use Cobra gold currency. Maybe he wasn’t so bad after all? But he did spend most of his time shouting orders at people. He commanded things to change, much in the same way we command our data to change. There is a great post on StackOverflow on this subject, I’ll reproduce the relevant bits here. If you define:

  • State : stored information at a given point of time
  • Command : directive to the storage to change its state

Then suppose your database has 6 blocks of data representing the current state and though a series of commands you alter those six blocks into a new state…

…and you keep in mind that:

  1. A command may affect many stored entities, so many blocks can be affected at the same time
  2. The next state is a function of the current state and the command

Then to guarantee data integrity you have some options:

  • Write-Ahead Logging – the idea that State changes should be logged before any update.
  • Command Logging – the idea that only the Command used to produce the state should be logged.

Write Ahead Logging is what most major databases use (Postgres, MySQL, SQLServer, Oracle, etc.) VoltDB uses command logging. We are going to be using command logging as well. Today we’ll talk about the “easy” way to implement this feature. For starters, the Seastar library has a “logger” class meant to for syslog, but it also lets us point the log to a file and capture our changes there. So what are we going to log for our commands?

Well… the only way to talk to RageDB right now is via the http interface, so let’s just log the http requests one per line:

static const seastar::logger::format_info format("{1}&method={0}");
 
void Graph::Log(const seastar::sstring method, const seastar::sstring url) {
   logger.info(format, method.c_str(), url.c_str());
}

Hold on, we also have JSON payloads coming and Lua scripts, both of which allow newline characters. I really wasn’t sure what to do here, so I did the simplest thing I knew would work. I base64 encoded them. We could encrypt the line instead, that would also work and give us encryption at rest. We’ll leave it for now.

static const seastar::logger::format_info with_body_format("{1}&method={0}&body={2}");

void Graph::Log(const seastar::sstring method, const seastar::sstring url, const seastar::sstring body) {
    logger.info(with_body_format, method.c_str(), url.c_str(), base64::encode(body).c_str());
}

Ok, let’s add logging to our http requests. For example when we want to “Put” a node property, we first validate the parameters and call the parent.graph.Log function with the method, the url and the content of the request:

future<std::unique_ptr<reply>> NodeProperties::PutNodePropertyHandler::handle([[maybe_unused]] const sstring &path, std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
    bool valid_type = Utilities::validate_parameter(Utilities::TYPE, req, rep, "Invalid type");
    bool valid_key = Utilities::validate_parameter(Utilities::KEY, req, rep, "Invalid key");
    bool valid_property = Utilities::validate_parameter(Utilities::PROPERTY, req, rep, "Invalid property");

    if(valid_type && valid_key && valid_property) {
        parent.graph.Log(req->_method, req->get_url(), req->content);
        return parent.graph.shard.invoke_on(this_shard_id(), [req = std::move(req)] (Shard &local_shard) {
            return local_shard.NodePropertySetFromJsonPeered(req->param[Utilities::TYPE], req->param[Utilities::KEY], req->param[Utilities::PROPERTY], req->content.c_str());
        }).then([rep = std::move(rep)] (bool success) mutable {
            if(success) {
                rep->set_status(reply::status_type::no_content);
            } else {
                rep->set_status(reply::status_type::not_modified);
            }
            return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
        });
    }

    return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}

We only need to keep track of the POST, PUT and DELETE operations, so I’ll go ahead and add all of those. Let’s try it by creating 1 million “Address” nodes. First we need to tell the database our schema:

curl -X POST http://localhost:7243/db/rage/schema/nodes/Address
curl -X POST http://localhost:7243/db/rage/schema/nodes/Address/properties/uuid/string
curl -X POST http://localhost:7243/db/rage/schema/nodes/Address/properties/number/integer
curl -X POST http://localhost:7243/db/rage/schema/nodes/Address/properties/street_address/string
curl -X POST http://localhost:7243/db/rage/schema/nodes/Address/properties/state/string
curl -X POST http://localhost:7243/db/rage/schema/nodes/Address/properties/zip_code/string

Then we’ll use the benchmarks project to create the million nodes:

================================================================================
---- Global Information --------------------------------------------------------
> request count                                    1000000 (OK=1000000 KO=0     )
> min response time                                      0 (OK=0      KO=-     )
> max response time                                     33 (OK=33     KO=-     )
> mean response time                                     0 (OK=0      KO=-     )
> std deviation                                          0 (OK=0      KO=-     )
> response time 50th percentile                          0 (OK=0      KO=-     )
> response time 75th percentile                          0 (OK=0      KO=-     )
> response time 95th percentile                          1 (OK=1      KO=-     )
> response time 99th percentile                          1 (OK=1      KO=-     )
> mean requests/sec                                17857.143 (OK=17857.143 KO=-     )
---- Response Time Distribution ------------------------------------------------
> t < 800 ms                                       1000000 (100%)
> 800 ms < t < 1200 ms                                   0 (  0%)
> t > 1200 ms                                            0 (  0%)
> failed                                                 0 (  0%)
================================================================================

Great, now let’s take a look at our log:

INFO  2021-09-08 02:23:48,917 [shard 0]  - http://172.31.65.109:7243/db/rage/node/Address/1668&method=POST&body=eyAibnVtYmVyIjogNzc4MywgInN0cmVldF9hZGRyZXNzIjogIjQwNSBCZWNodGVsYXIgSnVuY3Rpb24gQXB0LiA2NTIiLCAiY2l0eSI6ICJMYWtldG9uIiwic3RhdGUiOiAiSUQiLCJ6aXBfY29kZSI6ICI2MDU5MyIgfQ
INFO  2021-09-08 02:23:48,917 [shard 3]  - http://172.31.65.109:7243/db/rage/node/Address/1667&method=POST&body=eyAibnVtYmVyIjogMjI1MDgsICJzdHJlZXRfYWRkcmVzcyI6ICI5MDUgT2JlcmJydW5uZXIgQ3Jvc3NpbmcgU3VpdGUgMDg1IiwgImNpdHkiOiAiV2VzdGJ1cnkiLCJzdGF0ZSI6ICJNVCIsInppcF9jb2RlIjogIjc4MTQ0LTE0MjIiIH0
INFO  2021-09-08 02:23:48,917 [shard 3]  - http://172.31.65.109:7243/db/rage/node/Address/1667&method=POST&body=eyAibnVtYmVyIjogMjI1MDgsICJzdHJlZXRfYWRkcmVzcyI6ICI5MDUgT2JlcmJydW5uZXIgQ3Jvc3NpbmcgU3VpdGUgMDg1IiwgImNpdHkiOiAiV2VzdGJ1cnkiLCJzdGF0ZSI6ICJNVCIsInppcF9jb2RlIjogIjc4MTQINFO  2021-09-08 02:23:48,917 [shard 1]  - http://172.31.65.109:7243/db/rage/node/Address/1669&method=POST&body=eyAibnVtYmVyIjogMjUzNTQsICJzdHJlZXRfYWRkcmVzcyI6ICI4NzQ2OCBEYW1pYW4gTGFrZXMgQXB0LiA0NTAiLCAiY2l0eSI6ICJXZXN0Ym9yb3VnaCIsInN0YXRlIjogIk9SIiwiemlwX2NvZGUiOiAiMjYwNDEiIH0
INFO  2021-09-08 02:23:48,918 [shard 0]  - http://172.31.65.109:7243/db/rage/node/Address/1670&method=POST&body=eyAibnVtYmVyIjogNzM0NiwgInN0cmVldF9hZGRyZXNzIjogIjE5NTAgU2F1ZXIgSXNsYW5kcyBBcHQuIDAyOSIsICJjaXR5IjogIkVhc3RzaGlyZSIsInN0YXRlIjogIlRYIiwiemlwX2NvZGUiOiAiOTI1NjQtMzc5MCIgfQ

Oh no. We have the logging for Address 1667 getting duplicated and mangled. Turns out syslog output is atomic but stdout output is not. So we can add locks (yuck) or we can find some other way to log.

I went digging around online and ran into “Reckless“, which is a low-latency, high-throughput, asynchronous logging library for C++ by Mattias Flodin. I love the name so I had to use it. It even comes with a way to handle crashes by calling panic flush on our logger. Perfect… except it wasn’t on Conan Center. So I got help from Antoine Charpentier to add it. I tested, it and it no longer mangles our output.

Now we have to be able to restore from the log file. We need an http client, looked around and found “cpr” a spiritual port of Python Requests written by Huu Nguyen and maintained by Fabian Sauter. We will put our restore feature behind an http endpoint “/db/{name}/restore” and it will return right away, runing the actual work on a separate thread.

    seastar::future<std::string> Shard::RestorePeered(const std::string& name) {
        std::thread t(Restore, name);
        t.detach();

        std::string message = "Restoring " + name;
        return seastar::make_ready_future<std::string>(message);
    }

The Restore function basically goes down the log file one line at a time, parses it, decodes the body if it has one and uses cpr to send http requests back to ourselves:

std::string line;
while (std::getline(restore_file, line)) {
    try {
        std::map<std::string, std::string> parsed = parseURL(line);
        session.SetUrl(cpr::Url{parsed["address"]});
        
        auto body_search = parsed.find("body");
        if (body_search != std::end(parsed)) {
             session.SetBody(cpr::Body{base64::decode<std::string>(body_search->second)});
        } else {
            session.SetBody(cpr::Body{});
        }
        
        cpr::Response r;
        
        if (parsed["method"] == "POST")
            r = session.Post();
        else if (parsed["method"] == "PUT")
            r = session.Put();
        else if (parsed["method"] == "DELETE")
            r = session.Delete();

Awesome let’s try it by sending a POST request to http://localhost:7243/db/rage/restore:

2021-09-14 12:58:02.630 http://localhost:7243/db/rage/schema/nodes/Address&method=POST
2021-09-14 12:58:02.630 http://localhost:7243/db/rage/schema/nodes/Address/properties/uuid/string&method=POST
2021-09-14 12:58:02.634 http://localhost:7243/db/rage/schema/nodes/Address/properties/number/integer&method=POST
...
2021-09-14 13:00:36.419 http://localhost:7243/db/rage/node/Address/999999&method=POST&body=eyAibnVtYmVyIjogODYxNTgsICJzdHJlZXRfYWRkcmVzcyI6ICIxMDAgUmV5bmFsZG8gR2xlbiBBcHQuIDQ1NSIsICJjaXR5IjogIlNvdXRoaGF2ZW4iLCJzdGF0ZSI6ICJBTCIsInppcF9jb2RlIjogIjcwODMxIiB9

Looks like it took about 2 and a half minutes to replay the 1 million writes. It’s not terrible, but it’s not super fast either. We can use this same strategy to do replication by just sending the same http request to another server or two or ten. However this is probably pretty brittle, we may need a pull mechanism as well.

Let’s make sure our data is actually there:

Ok good. Command logging is only part of the story though. We’ll need some way to snapshot the data and trim our command logs otherwise it will get out of control and take too long to recover. We’ll tackle that another time. At least we have the start of persistence. Please join me on Slack! Until next time.

Tagged , , , , ,

Leave a comment