Latest posts for tag devel

Suppose you have a tool that archives images, or scientific data, and it has a test suite. It would be good to collect sample files for the test suite, but they are often so big one can't really bloat the repository with them.

But does the test suite need everything that is in those files? Not necesarily. For example, if one's testing code that reads EXIF metadata, one doesn't care about what is in the image.

That technique works extemely well. I can take GRIB files that are several megabytes in size, zero out their data payload, and get nice 1Kb samples for the test suite.

I've started to collect and organise the little hacks I use for this into a tool I called mktestsample:

$ mktestsample -v samples1/*
2021-11-23 20:16:32 INFO common samples1/cosmo_2d+0.grib: size went from 335168b to 120b
2021-11-23 20:16:32 INFO common samples1/grib2_ifs.arkimet: size went from 4993448b to 39393b
2021-11-23 20:16:32 INFO common samples1/polenta.jpg: size went from 3191475b to 94517b
2021-11-23 20:16:32 INFO common samples1/test-ifs.grib: size went from 1986469b to 4860b

Those are massive savings, but I'm not satisfied about those almost 94Kb of JPEG:

$ ls -la samples1/polenta.jpg
-rw-r--r-- 1 enrico enrico 94517 Nov 23 20:16 samples1/polenta.jpg
$ gzip samples1/polenta.jpg
$ ls -la samples1/polenta.jpg.gz
-rw-r--r-- 1 enrico enrico 745 Nov 23 20:16 samples1/polenta.jpg.gz

I believe I did all I could: completely blank out image data, set quality to zero, maximize subsampling, and tweak quantization to throw everything away.

Still, the result is a 94Kb file that can be gzipped down to 745 bytes. Is there something I'm missing?

I suppose JPEG is better at storing an image than at storing the lack of an image. I cannot really complain :)

I can still commit compressed samples of large images to a git repository, taking very little data indeed. That's really nice!

I wrote and maintain some C++ code to stream high quantities of data as fast as possible, and I try to use splice and sendfile when available.

The availability of those system calls varies at runtime according to a number of factors, and the code needs to be written to fall back to read/write loops depending on what the splice and sendfile syscalls say.

The tricky issue is unit testing: since the code path chosen depends on the kernel, the test suite will test one path or the other depending on the machine and filesystems where the tests are run.

It would be nice to be able to mock the syscalls, and replace them during tests, and it looks like I managed.

First I made catalogues of the mockable syscalls I want to be able to mock. One with function pointers, for performance, and one with std::function, for flexibility:

/**
 * Linux versions of syscalls to use for concrete implementations.
 */
struct ConcreteLinuxBackend
{
    static ssize_t (*read)(int fd, void *buf, size_t count);
    static ssize_t (*write)(int fd, const void *buf, size_t count);
    static ssize_t (*writev)(int fd, const struct iovec *iov, int iovcnt);
    static ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count);
    static ssize_t (*splice)(int fd_in, loff_t *off_in, int fd_out,
                             loff_t *off_out, size_t len, unsigned int flags);
    static int (*poll)(struct pollfd *fds, nfds_t nfds, int timeout);
    static ssize_t (*pread)(int fd, void *buf, size_t count, off_t offset);
};

/**
 * Mockable versions of syscalls to use for testing concrete implementations.
 */
struct ConcreteTestingBackend
{
    static std::function<ssize_t(int fd, void *buf, size_t count)> read;
    static std::function<ssize_t(int fd, const void *buf, size_t count)> write;
    static std::function<ssize_t(int fd, const struct iovec *iov, int iovcnt)> writev;
    static std::function<ssize_t(int out_fd, int in_fd, off_t *offset, size_t count)> sendfile;
    static std::function<ssize_t(int fd_in, loff_t *off_in, int fd_out,
                                 loff_t *off_out, size_t len, unsigned int flags)> splice;
    static std::function<int(struct pollfd *fds, nfds_t nfds, int timeout)> poll;
    static std::function<ssize_t(int fd, void *buf, size_t count, off_t offset)> pread;

    static void reset();
};

Then I converted the code to templates, parameterized on the catalogue class.

Explicit template instantiation helps in making sure that one doesn't need to include template code in all sorts of places.

Finally, I can have a RAII class for mocking:

/**
 * RAII mocking of syscalls for concrete stream implementations
 */
struct MockConcreteSyscalls
{
    std::function<ssize_t(int fd, void *buf, size_t count)> orig_read;
    std::function<ssize_t(int fd, const void *buf, size_t count)> orig_write;
    std::function<ssize_t(int fd, const struct iovec *iov, int iovcnt)> orig_writev;
    std::function<ssize_t(int out_fd, int in_fd, off_t *offset, size_t count)> orig_sendfile;
    std::function<ssize_t(int fd_in, loff_t *off_in, int fd_out,
                                 loff_t *off_out, size_t len, unsigned int flags)> orig_splice;
    std::function<int(struct pollfd *fds, nfds_t nfds, int timeout)> orig_poll;
    std::function<ssize_t(int fd, void *buf, size_t count, off_t offset)> orig_pread;

    MockConcreteSyscalls();
    ~MockConcreteSyscalls();
};

MockConcreteSyscalls::MockConcreteSyscalls()
    : orig_read(ConcreteTestingBackend::read),
      orig_write(ConcreteTestingBackend::write),
      orig_writev(ConcreteTestingBackend::writev),
      orig_sendfile(ConcreteTestingBackend::sendfile),
      orig_splice(ConcreteTestingBackend::splice),
      orig_poll(ConcreteTestingBackend::poll),
      orig_pread(ConcreteTestingBackend::pread)
{
}

MockConcreteSyscalls::~MockConcreteSyscalls()
{
    ConcreteTestingBackend::read = orig_read;
    ConcreteTestingBackend::write = orig_write;
    ConcreteTestingBackend::writev = orig_writev;
    ConcreteTestingBackend::sendfile = orig_sendfile;
    ConcreteTestingBackend::splice = orig_splice;
    ConcreteTestingBackend::poll = orig_poll;
    ConcreteTestingBackend::pread = orig_pread;
}

And here's the specialization to pretend sendfile and splice aren't available:

/**
 * Mock sendfile and splice as if they weren't available on this system
 */
struct DisableSendfileSplice : public MockConcreteSyscalls
{
    DisableSendfileSplice();
};

DisableSendfileSplice::DisableSendfileSplice()
{
    ConcreteTestingBackend::sendfile = [](int out_fd, int in_fd, off_t *offset, size_t count) -> ssize_t {
        errno = EINVAL;
        return -1;
    };
    ConcreteTestingBackend::splice = [](int fd_in, loff_t *off_in, int fd_out,
                                        loff_t *off_out, size_t len, unsigned int flags) -> ssize_t {
        errno = EINVAL;
        return -1;
    };
}

It's now also possible to reproduce in the test suite all sorts of system-related issues we might observe in production over time.

Next time we'll iterate on Himblick design and development, Raspberry Pi 4 can now run plain standard Debian, which should make a lot of things easier and cleaner when developing products based on it.

Somewhat related to nspawn-runner, random links somehow related to my feeling that nspawn comes from an ecosystem which gives me a bigger sense of focus on security and solidity than Docker:

I did a lot of work on A38, a Python library to deal with FatturaPA electronic invoicing, and it was a wonderful surprise to see a positive review spontaneously appear! ♥: Fattura elettronica, come visualizzarla con python | TuttoLogico

A beautiful, hands-on explanation of git internals, as a step by step guide to reimplementing your own git: Git Internals - Learn by Building Your Own Git

I recently tried meson and liked it a lot. I then gave unity builds a try, since it supports them out of the box, and found myself with doubts. I found I wasn't alone, and I liked The Evils of Unity Builds as a summary of the situation.

A point of view I liked on technological debt: Technical debt as a lack of understanding

Finally, a classic, and a masterful explanation for a question that keeps popping up: RegEx match open tags except XHTML self-contained tags

While traveling around Germany, one notices that most towns have a Greek or Italian restaurant, and they all kind of have the same names. How bad is that lack of fantasy?

Let's play with https://overpass-turbo.eu/. Select a bounding box and run this query:

node
  [cuisine=greek]
  ({{bbox}});
out;

Export the results as gpx and have some fun on the command line:

sed -nre 's/^name=([^<]+).*/\1/p' /tmp/greek.gpx \
   | sed -re 's/ *(Grill|Restaurant|Tavern[ae]) *//g' \
   | sort | uniq -c | sort -nr > /tmp/greek.txt

Likewise, with Italian restaurants, you can use cuisine=italian and something like:

sed -nre 's/^name=([^<]+).*/\1/p' /tmp/italian.gpx \
   | sed -re 's/ *(Restaurant|Ristorante|Pizzeria) *//g' \
   | sort | uniq -c | sort -nr > /tmp/italian.txt

Here are the top 20 that came out for Greek:

    162 Akropolis
     91 Delphi
     86 Poseidon
     78 Olympia
     78 Mykonos
     78 Athen
     76 Hellas
     74 El Greco
     71 Rhodos
     57 Dionysos
     53 Kreta
     50 Syrtaki
     49 Korfu
     43 Santorini
     43 Athos
     40 Mythos
     39 Zorbas
     35 Artemis
     33 Meteora
     29 Der Grieche

Here are the top 20 that came out for Italian, with a sadly ubiquitous franchise as an outlier:

     66 Vapiano
     64 Bella Italia
     59 L'Osteria
     54 Roma
     43 La Piazza
     38 La Dolce Vita
     38 Dolce Vita
     35 Italia
     32 Pinocchio
     31 Toscana
     30 Venezia
     28 Milano
     28 Mamma Mia
     27 Bella Napoli
     25 San Marco
     24 Portofino
     22 La Piazzetta
     22 La Gondola
     21 Da Vinci
     21 Da Pino

One can play a game while traveling: being the first to spot a Greek or Italian restaurant earns more points the more unusual its name is. But beware of being too quick! If you try to claim points for one of the restaurant with the top-5 most common names, you will actually will actually lose points!

Have fun playing with other combinations of areas and cuisine: the Overpass API is pretty cool!

Update:

Rather than running xml through sed, one can export geojson, then parse it with the excellent jq:

jq -r '.features[].properties.name' italian.json \
   | sed -re 's/ *(Restaurant|Ristorante|Pizzeria) *//g' \
   | sort | uniq -c | sort -nr > /tmp/italian.txt
One-page guide to ES2015+: usage, examples, and more. A quick overview of new JavaScript features in ES2015, ES2016, ES2017, ES2018 and beyond.
Rich offline experiences, periodic background syncs, push notifications&amp;mdash;functionality that would normally require a native application&amp;mdash;are coming to the web. Service workers provide the technical foundation that all these features rely on.
The Service Worker Cookbook is a collection of working, practical examples of using service workers in modern web sites.
One overriding problem that web users have suffered with for years is loss of connectivity. The best web app in the world will provide a terrible user experience if you can’t download it. There have been various attempts to create technologies to solve this problem, as our Offline page shows, and some of the issues have been solved.

One of the software I maintain for work is a GUI data browser that uses Tornado as a backend and a web browser as a front-end.

It is quite convenient to start the command and have the browser open automatically on the right URL. It's quite annoying to start the command and be told that the default port is already in use.

I've needed this trick quite often, also when writing unit tests, and it's time I note it down somewhere, so it's easier to find than going through Tornado's unittest code where I found it the first time.

This is how to start Tornado on a free random port:

from tornado.options import define, options
import tornado.netutil
import tornado.httpserver

define("web_port", type=int, default=None, help="listening port for web interface")

application = Application(self.db_url)

if options.web_port is None:
    sockets = tornado.netutil.bind_sockets(0, '127.0.0.1')
    self.web_port = sockets[0].getsockname()[:2][1]
    server = tornado.httpserver.HTTPServer(application)
    server.add_sockets(sockets)
else:
    server = tornado.httpserver.HTTPServer(application)
    server.listen(options.web_port)

I am writing a little application server for microservices written as compiled binaries, and I would like to log execution statistics from getrusage(2).

The application server is written using asyncio, and processes are managed using asyncio subprocesses.

Unfortunately, asyncio uses os.waitpid instead of os.wait4 to reap child processes, and to get rusage information one has to delve into the asyncio innards, and provide a custom ChildWatcher implementation. Here's how I did it:

import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os


class ExtendedResults:
    def __init__(self):
        self.rusage = None
        self.returncode = None


class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
    """
    SafeChildWatcher that uses os.wait4 to also get rusage information.
    """
    rusage_results = {}

    @classmethod
    @contextmanager
    def monitor(cls, proc):
        """
        Return an ExtendedResults that gets filled when the process exits
        """
        assert proc.pid > 0
        pid = proc.pid
        extended_results = ExtendedResults()
        cls.rusage_results[pid] = extended_results
        try:
            yield extended_results
        finally:
            cls.rusage_results.pop(pid, None)

    def _do_waitpid(self, expected_pid):
        # The original is in asyncio/unix_events.py; on new python versions, it
        # makes sense to check changes to it and port them here
        assert expected_pid > 0

        try:
            pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
            logger.warning(
                "Unknown child process pid %d, will report returncode 255",
                pid)
        else:
            if pid == 0:
                # The child process is still alive.
                return

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        extended_results = self.rusage_results.get(pid)
        if extended_results is not None:
            extended_results.rusage = rusage
            extended_results.returncode = returncode

        try:
            callback, args = self._callbacks.pop(pid)
        except KeyError:  # pragma: no cover
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
        else:
            callback(pid, returncode, *args)

    @classmethod
    def install(cls):
        loop = asyncio.get_event_loop()
        child_watcher = cls()
        child_watcher.attach_loop(loop)
        asyncio.set_child_watcher(child_watcher)

To use it:

from .hacks import SafeChildWatcherWithRusage
SafeChildWatcherWithRusage.install()

...

    @coroutine
    def run(self, *args, **kw):
        kw["stdin"] = asyncio.subprocess.PIPE
        kw["stdout"] = asyncio.subprocess.PIPE
        kw["stderr"] = asyncio.subprocess.PIPE
        self.started = time.time()

        self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)

        from .hacks import SafeChildWatcherWithRusage
        with SafeChildWatcherWithRusage.monitor(self.proc) as results:
            yield from asyncio.tasks.gather(
                self.write_stdin(self.proc.stdin),
                self.read_stdout(self.proc.stdout),
                self.read_stderr(self.proc.stderr)
            )
        self.returncode = yield from self.proc.wait()
        self.rusage = results.rusage
        self.ended = time.time()

Debian conveniently distribute JavaScript libraries, and expects packaged software to use them rather than embedding their own copy.

Here is a convenient custom StaticFileHandler for Tornado that looks for the Debian-distributed versions of JavaScript libraries, and falls back to the vendored versions if they are not found:

from tornado import web
import pathlib


class StaticFileHandler(web.StaticFileHandler):
    """
    StaticFileHandler that allows overriding paths in the static directory with
    system provided versions
    """
    SYSTEM_ASSET_PATH = pathlib.Path("/usr/share/javascript")

    @classmethod
    def get_absolute_path(self, root, path):
        path = pathlib.PurePath(path)
        if not path.parts:
            return super().get_absolute_path(root, path)

        system_dir = self.SYSTEM_ASSET_PATH.joinpath(path.parts[0])
        if system_dir.is_dir():
            # If that asset directory exists in the system, look for things in
            # there
            return self.SYSTEM_ASSET_PATH.joinpath(path)
        else:
            # Else go ahead with the default static dir
            return super().get_absolute_path(root, path)

    def validate_absolute_path(self, root, absolute_path):
        """
        Rewrite of tornado's validate_absolute_path not to raise an error for
        paths in /usr/share/javascript/
        """
        root = pathlib.Path(root)
        absolute_path = pathlib.Path(absolute_path)

        is_system_root = absolute_path.parts[:len(self.SYSTEM_ASSET_PATH.parts)] == self.SYSTEM_ASSET_PATH.parts
        is_static_root = absolute_path.parts[:len(root.parts)] == root.parts

        if not is_system_root and not is_static_root:
            raise web.HTTPError(403, "%s is not in root static directory or system assets path",
                                self.path)

        if absolute_path.is_dir() and self.default_filename is not None:
            # need to look at the request.path here for when path is empty
            # but there is some prefix to the path that was already
            # trimmed by the routing
            if not self.request.path.endswith("/"):
                self.redirect(self.request.path + "/", permanent=True)
                return
            absolute_path = absolute_path.joinpath(self.default_filename)
        if not absolute_path.exists():
            raise web.HTTPError(404)
        if not absolute_path.is_file():
            raise web.HTTPError(403, "%s is not a file", self.path)
        return str(absolute_path)

This is how to use it:

class DebianApplication(tornado.web.Application):
    def __init__(self, *args, **settings):
        from .static import StaticFileHandler
        settings.setdefault("static_handler_class", StaticFileHandler)
        super().__init__(*args, **settings)

And from HTML it's simply a matter of matching the first path component to what is used by Debian's packages under /usr/share/javascript:

    <link rel="stylesheet" href="{{static_url('bootstrap4/css/bootstrap.min.css')}}">
    <script src="{{static_url('jquery/jquery.min.js')}}"></script>
    <script src="{{static_url('popper.js/umd/popper.min.js')}}"></script>
    <script src="{{static_url('bootstrap4/js/bootstrap.min.js')}}"></script>

I find it quite convenient: this way I can start writing prototype code without worrying about fetching javascript libraries to bundle.

I only need to start worrying about it if I need to deploy outside of Debian, or to old stable versions of Debian that don't contain the required JavaScript dependencies. In that case, I just cp -r from a working /usr/share/javascript into Tornado's static directory, and I'm done.

Python mailbox.mbox is not good at opening compressed mailboxes:

>>> import mailbox
>>> print(len(mailbox.mbox("/tmp/test.mbox")))
9
>>> print(len(mailbox.mbox("/tmp/test.mbox.gz")))
0
>>> print(len(mailbox.mbox("/tmp/test1.mbox.xz")))
0

For a prototype rewrite of the MIA team's Echelon (the engine behind mia-query), I needed to scan compressed mailboxes, and I had to work around this limitation.

Here is the alternative mailbox.mbox implementation:

import lzma
import gzip
import bz2
import mailbox


class StreamMbox(mailbox.mbox):
    """
    mailbox.mbox does not support opening a stream, which is sad.

    This is a subclass that works around it
    """
    def __init__(self, fd: BinaryIO, factory=None, create: bool = True):
        # Do not call parent __init__, just redo everything here to be able to
        # open a stream. This will need to be re-reviewed for every new version
        # of python's stdlib.

        # Mailbox constructor
        self._path = None
        self._factory = factory

        # _singlefileMailbox constructor
        self._file = fd
        self._toc = None
        self._next_key = 0
        self._pending = False       # No changes require rewriting the file.
        self._pending_sync = False  # No need to sync the file
        self._locked = False
        self._file_length = None    # Used to record mailbox size

        # mbox constructor
        self._message_factory = mailbox.mboxMessage

    def flush(self):
        raise NotImplementedError("StreamMbox is a readonly class")


class UsageExample:
    DECOMPRESS = {
        ".xz": lzma.open,
        ".gz": gzip.open,
        ".bz2": bz2.open,
    }

    @classmethod
    def scan(cls, path: Path) -> Generator[ScannedEmail, None, None]:
        decompress = cls.DECOMPRESS.get(path.suffix)
        if decompress is None:
            with open(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)
        else:
            with decompress(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)

    @classmethod
    def scan_fd(cls, path: Path, fd: BinaryIO) -> Generator[ScannedEmail, None, None]:
        mbox = StreamMbox(fd)
        for msg in mbox:
            ...

This code:

#!/usr/bin/python3

class Test:
    def __init__(self, items=[]):
        self.items = items

    def add(self, item):
        self.items.append(item)


a = Test()
a.add("foo")
b = Test()
b.add("bar")
print(repr(a.items))
print(repr(b.items))

"obviously" prints:

['foo', 'bar']
['foo', 'bar']

Because the default value of the items argument is a mutable list constructed just once when the code is compiled when the function definition is evaluated, and then reused.

So, in Python, mutable items in default arguments are a good way to get more fun time with debugging.