Latest posts for tag python

I had to package a nontrivial Python codebase, and I needed to put dependencies in setup.py.

I could do git grep -h import | sort -u, then review the output by hand, but I lacked the motivation for it. Much better to take a stab at solving the general problem

The result is at https://github.com/spanezz/python-devel-tools.

One fun part is scanning a directory tree, using ast to find import statements scattered around the code:

class Scanner:
    def __init__(self):
        self.names: Set[str] = set()

    def scan_dir(self, root: str):
        for dirpath, dirnames, filenames, dir_fd in os.fwalk(root):
            for fn in filenames:
                if fn.endswith(".py"):
                    with dirfd_open(fn, dir_fd=dir_fd) as fd:
                        self.scan_file(fd, os.path.join(dirpath, fn))
                st = os.stat(fn, dir_fd=dir_fd)
                if st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH):
                    with dirfd_open(fn, dir_fd=dir_fd) as fd:
                        try:
                            lead = fd.readline()
                        except UnicodeDecodeError:
                            continue
                        if re_python_shebang.match(lead):
                            fd.seek(0)
                            self.scan_file(fd, os.path.join(dirpath, fn))

    def scan_file(self, fd: TextIO, pathname: str):
        log.info("Reading file %s", pathname)
        try:
            tree = ast.parse(fd.read(), pathname)
        except SyntaxError as e:
            log.warning("%s: file cannot be parsed", pathname, exc_info=e)
            return

        self.scan_tree(tree)

    def scan_tree(self, tree: ast.AST):
        for stm in tree.body:
            if isinstance(stm, ast.Import):
                for alias in stm.names:
                    if not isinstance(alias.name, str):
                        print("NAME", repr(alias.name), stm)
                    self.names.add(alias.name)
            elif isinstance(stm, ast.ImportFrom):
                if stm.module is not None:
                    self.names.add(stm.module)
            elif hasattr(stm, "body"):
                self.scan_tree(stm)

Another fun part is grouping the imported module names by where in sys.path they have been found:

    scanner = Scanner()
    scanner.scan_dir(args.dir)

    sys.path.append(args.dir)
    by_sys_path: Dict[str, List[str]] = collections.defaultdict(list)
    for name in sorted(scanner.names):
        spec = importlib.util.find_spec(name)
        if spec is None or spec.origin is None:
            by_sys_path[""].append(name)
        else:
            for sp in sys.path:
                if spec.origin.startswith(sp):
                    by_sys_path[sp].append(name)
                    break
            else:
                by_sys_path[spec.origin].append(name)

    for sys_path, names in sorted(by_sys_path.items()):
        print(f"{sys_path or 'unidentified'}:")
        for name in names:
            print(f"  {name}")

An example. It's kind of nice how it can at least tell apart stdlib modules so one doesn't need to read through those:

$ ./scan-imports …/himblick
unidentified:
  changemonitor
  chroot
  cmdline
  mediadir
  player
  server
  settings
  static
  syncer
  utils
…/himblick:
  himblib.cmdline
  himblib.host_setup
  himblib.player
  himblib.sd
/usr/lib/python3.9:
  __future__
  argparse
  asyncio
  collections
  configparser
  contextlib
  datetime
  io
  json
  logging
  mimetypes
  os
  pathlib
  re
  secrets
  shlex
  shutil
  signal
  subprocess
  tempfile
  textwrap
  typing
/usr/lib/python3/dist-packages:
  asyncssh
  parted
  progressbar
  pyinotify
  setuptools
  tornado
  tornado.escape
  tornado.httpserver
  tornado.ioloop
  tornado.netutil
  tornado.web
  tornado.websocket
  yaml
built-in:
  sys
  time

Maybe such a tool already exists and works much better than this? From a quick search I didn't find it, and it was fun to (re)invent it.

Updates:

Jakub Wilk pointed out to an old python-modules script that finds Debian dependencies.

The AST scanning code should be refactored to use ast.NodeVisitor.

Here's a little toy program that displays a message like a split-flap display:

#!/usr/bin/python3

import sys
import time

def display(line: str):
    cur = '0' * len(line)
    while True:
        print(cur, end="\r")
        if cur == line:
            break
        time.sleep(0.09)
        cur = "".join(chr(min(ord(c) + 1, ord(oc))) for c, oc in zip(cur, line))
    print()

message = " ".join(sys.argv[1:])
display(message.upper())

This only works if the script's stdout is unbuffered. Pipe the output through cat, and you get a long wait, and then the final string, without the animation.

What is happening is that since the output is not going to a terminal, optimizations kick in that buffer the output and send it in bigger chunks, to make processing bulk I/O more efficient.

I haven't found a good introductory explanation of buffering in Python's documentation. The details seem to be scattered in the io module documentation and they mostly assume that one is already familiar with concepts like unbuffered, line-buffered or block-buffered. The libc documentation has a good quick introduction that one can read to get up to speed.

Controlling buffering in Python

In Python, one can force a buffer flush with the flush() method of the output file descriptor, like sys.stdout.flush(), to make sure pending buffered output gets sent.

Python's print() function also supports flush=True as an optional argument:

    print(cur, end="\r", flush=True)

If one wants to change the default buffering for a file descriptor, since Python 3.7 there's a convenient reconfigure() method, which can reconfigure line buffering only:

sys.stdout.reconfigure(line_buffering=True)

Otherwise, the technique is to reassign sys.stdout to something that has the behaviour one wants (code from this StackOverflow thread):

import io
# Python 3, open as binary, then wrap in a TextIOWrapper with write-through.
sys.stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)

If one needs all this to implement a progressbar, one should make sure to have a look at the progressbar module first.

I'm too lazy to manually check code blocks in autogenerated sphinx documentation to see if they are valid and reasonably up to date. Doing it automatically feels much more interesting to me: here's how I did it.

(continue reading)

Python comes with a built-in way of collecting profile information, documented at https://docs.python.org/3/library/profile.html.

In a nutshell, it boils down to:

python3 -m cProfile -o profile.out command.py arguments…

This post is an attempt to document what's in profile.out, since the python documentation does not cover it. I have tried to figure this out by looking at the sources of the pstats module, the sources of gprof2dot, and trying to make sense of the decoded structure, and navigate it.

(continue reading)

A little gitpython recipe to list the paths of all files in a commit:

#!/usr/bin/python3

import git
from pathlib import Path
import sys


def list_paths(root_tree, path=Path(".")):
    for blob in root_tree.blobs:
        yield path / blob.name
    for tree in root_tree.trees:
        yield from list_paths(tree, path / tree.name)


repo = git.Repo(".", search_parent_directories=True)
commit = repo.commit(sys.argv[1])
for path in list_paths(commit.tree):
    print(path)

It can be a good base, for example, for writing a script that, given two git branches, shows which django migrations are in one and not in the other, without doing any git checkout of the code.

One of the software I maintain for work is a GUI data browser that uses Tornado as a backend and a web browser as a front-end.

It is quite convenient to start the command and have the browser open automatically on the right URL. It's quite annoying to start the command and be told that the default port is already in use.

I've needed this trick quite often, also when writing unit tests, and it's time I note it down somewhere, so it's easier to find than going through Tornado's unittest code where I found it the first time.

This is how to start Tornado on a free random port:

from tornado.options import define, options
import tornado.netutil
import tornado.httpserver

define("web_port", type=int, default=None, help="listening port for web interface")

application = Application(self.db_url)

if options.web_port is None:
    sockets = tornado.netutil.bind_sockets(0, '127.0.0.1')
    self.web_port = sockets[0].getsockname()[:2][1]
    server = tornado.httpserver.HTTPServer(application)
    server.add_sockets(sockets)
else:
    server = tornado.httpserver.HTTPServer(application)
    server.listen(options.web_port)

I am writing a little application server for microservices written as compiled binaries, and I would like to log execution statistics from getrusage(2).

The application server is written using asyncio, and processes are managed using asyncio subprocesses.

Unfortunately, asyncio uses os.waitpid instead of os.wait4 to reap child processes, and to get rusage information one has to delve into the asyncio innards, and provide a custom ChildWatcher implementation. Here's how I did it:

import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os


class ExtendedResults:
    def __init__(self):
        self.rusage = None
        self.returncode = None


class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
    """
    SafeChildWatcher that uses os.wait4 to also get rusage information.
    """
    rusage_results = {}

    @classmethod
    @contextmanager
    def monitor(cls, proc):
        """
        Return an ExtendedResults that gets filled when the process exits
        """
        assert proc.pid > 0
        pid = proc.pid
        extended_results = ExtendedResults()
        cls.rusage_results[pid] = extended_results
        try:
            yield extended_results
        finally:
            cls.rusage_results.pop(pid, None)

    def _do_waitpid(self, expected_pid):
        # The original is in asyncio/unix_events.py; on new python versions, it
        # makes sense to check changes to it and port them here
        assert expected_pid > 0

        try:
            pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
            logger.warning(
                "Unknown child process pid %d, will report returncode 255",
                pid)
        else:
            if pid == 0:
                # The child process is still alive.
                return

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        extended_results = self.rusage_results.get(pid)
        if extended_results is not None:
            extended_results.rusage = rusage
            extended_results.returncode = returncode

        try:
            callback, args = self._callbacks.pop(pid)
        except KeyError:  # pragma: no cover
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
        else:
            callback(pid, returncode, *args)

    @classmethod
    def install(cls):
        loop = asyncio.get_event_loop()
        child_watcher = cls()
        child_watcher.attach_loop(loop)
        asyncio.set_child_watcher(child_watcher)

To use it:

from .hacks import SafeChildWatcherWithRusage
SafeChildWatcherWithRusage.install()

...

    @coroutine
    def run(self, *args, **kw):
        kw["stdin"] = asyncio.subprocess.PIPE
        kw["stdout"] = asyncio.subprocess.PIPE
        kw["stderr"] = asyncio.subprocess.PIPE
        self.started = time.time()

        self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)

        from .hacks import SafeChildWatcherWithRusage
        with SafeChildWatcherWithRusage.monitor(self.proc) as results:
            yield from asyncio.tasks.gather(
                self.write_stdin(self.proc.stdin),
                self.read_stdout(self.proc.stdout),
                self.read_stderr(self.proc.stderr)
            )
        self.returncode = yield from self.proc.wait()
        self.rusage = results.rusage
        self.ended = time.time()

Python mailbox.mbox is not good at opening compressed mailboxes:

>>> import mailbox
>>> print(len(mailbox.mbox("/tmp/test.mbox")))
9
>>> print(len(mailbox.mbox("/tmp/test.mbox.gz")))
0
>>> print(len(mailbox.mbox("/tmp/test1.mbox.xz")))
0

For a prototype rewrite of the MIA team's Echelon (the engine behind mia-query), I needed to scan compressed mailboxes, and I had to work around this limitation.

Here is the alternative mailbox.mbox implementation:

import lzma
import gzip
import bz2
import mailbox


class StreamMbox(mailbox.mbox):
    """
    mailbox.mbox does not support opening a stream, which is sad.

    This is a subclass that works around it
    """
    def __init__(self, fd: BinaryIO, factory=None, create: bool = True):
        # Do not call parent __init__, just redo everything here to be able to
        # open a stream. This will need to be re-reviewed for every new version
        # of python's stdlib.

        # Mailbox constructor
        self._path = None
        self._factory = factory

        # _singlefileMailbox constructor
        self._file = fd
        self._toc = None
        self._next_key = 0
        self._pending = False       # No changes require rewriting the file.
        self._pending_sync = False  # No need to sync the file
        self._locked = False
        self._file_length = None    # Used to record mailbox size

        # mbox constructor
        self._message_factory = mailbox.mboxMessage

    def flush(self):
        raise NotImplementedError("StreamMbox is a readonly class")


class UsageExample:
    DECOMPRESS = {
        ".xz": lzma.open,
        ".gz": gzip.open,
        ".bz2": bz2.open,
    }

    @classmethod
    def scan(cls, path: Path) -> Generator[ScannedEmail, None, None]:
        decompress = cls.DECOMPRESS.get(path.suffix)
        if decompress is None:
            with open(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)
        else:
            with decompress(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)

    @classmethod
    def scan_fd(cls, path: Path, fd: BinaryIO) -> Generator[ScannedEmail, None, None]:
        mbox = StreamMbox(fd)
        for msg in mbox:
            ...

This code:

#!/usr/bin/python3

class Test:
    def __init__(self, items=[]):
        self.items = items

    def add(self, item):
        self.items.append(item)


a = Test()
a.add("foo")
b = Test()
b.add("bar")
print(repr(a.items))
print(repr(b.items))

"obviously" prints:

['foo', 'bar']
['foo', 'bar']

Because the default value of the items argument is a mutable list constructed just once when the code is compiled when the function definition is evaluated, and then reused.

So, in Python, mutable items in default arguments are a good way to get more fun time with debugging.