API Reference

fetch(container, ocidir='', sourcecontainer=None)

fetch a container from a registry

Source code in src/sardinecake/ociregistry.py
def fetch(container: str, ocidir: str = "", sourcecontainer: str | None = None) -> bool:
    """fetch a container from a registry"""
    srccontainer = sourcecontainer or container
    contain, tag = utils.split_imagetag(container)
    if not issubclass(type(tag), str):
        container = contain
    dstcontainer = os.path.join(ocidir or "", container)
    dstcontainerdir = os.path.join(ocidir or "", contain)

    # skopeo copy docker://ghcr.io/cirruslabs/macos-monterey-vanilla:latest oci:monterey:latest
    os.makedirs(dstcontainerdir, exist_ok=True)
    cmd = ["skopeo", "copy", f"docker://{srccontainer}", f"oci:{dstcontainer}"]
    log.debug(cmd)

    args = {}
    if log.getEffectiveLevel() >= logging.INFO:
        args["stdout"] = subprocess.DEVNULL
        args["stderr"] = subprocess.DEVNULL

    p = subprocess.run(cmd, **args)
    if p.returncode:
        raise SystemExit(p.returncode)
    return True

login(registry, username=None, password=None)

login to a registry

Source code in src/sardinecake/auth.py
def login(
    registry: str, username: str | None = None, password: str | None = None
) -> bool:
    """login to a registry"""
    cmd = ["skopeo", "login"]
    if username:
        cmd += ["--username", username]
    if password:
        cmd += ["--password-stdin"]
    cmd += [registry]
    if password:
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
        p.communicate(password.encode())
    else:
        p = subprocess.run(cmd)

    return not p.returncode

logout(registry)

logout from a registry if is 'None', logout from all registries

Source code in src/sardinecake/auth.py
def logout(registry: str | None):
    """logout from a registry
    if <registry> is 'None', logout from *all* registries
    """
    cmd = ["skopeo", "logout"]
    if registry is None:
        cmd.append("--all")
    else:
        cmd.append(registry)

    p = subprocess.run(cmd)
    return not p.returncode

push(container, ocidir='', targetcontainer=None)

push a container to a registry

Source code in src/sardinecake/ociregistry.py
def push(container: str, ocidir: str = "", targetcontainer: str | None = None) -> bool:
    """push a container to a registry"""
    srccontainer = os.path.join(ocidir or "", container)
    dstcontainer = targetcontainer or container

    cmd = ["skopeo", "copy"]
    cmd += [f"oci:{srccontainer}", f"docker://{dstcontainer}"]
    log.debug(cmd)

    p = subprocess.run(cmd)
    if p.returncode:
        raise SystemExit(p.returncode)
    return True

clone(srcname, dstname, startmode=START_DEFINE, libvirtURI=None, mac=None, outdir=None, quiet=False, omit_backingstore=None)

do a fast clone of a VM

  • srcname: original VM
  • dstname: VM to be created
  • startmode: how to start the newly created VM
  • libvirtURI: connection URI to the libvirt server (note that only local QCOW2 disks can be cloned)
  • mac: MAC-address of cloned network interface (use None for a new random MAC)
  • outdir: place for the shallow cloned disks images (use None to use the same directory as the input disk images)
  • quiet: if True, suppress libvirt errors
  • omit_backingstore:
    • if False, creates a <backingStore/> entry for COW-images
    • if True, does not create a <backingStore/> entry for COW-images (might work around a bug with older libvirt)
    • if None, tries to autodetect whether the <backingStore/> should be omitted
Source code in src/sardinecake/clone.py
def clone(
    srcname: str,
    dstname: str,
    startmode: int = START_DEFINE,
    libvirtURI: str | None = None,
    mac: str | None = None,
    outdir: str | None = None,
    quiet: bool = False,
    omit_backingstore: bool | None = None,
):
    """do a fast clone of a VM

    - `srcname`: original VM
    - `dstname`: VM to be created
    - `startmode`: how to start the newly created VM
    - `libvirtURI`: connection URI to the libvirt server
        (note that only local QCOW2 disks can be cloned)
    - `mac`: MAC-address of cloned network interface (use None for a new random MAC)
    - `outdir`: place for the shallow cloned disks images (use None to use the same directory as the input disk images)
    - `quiet`: if `True`, suppress libvirt errors
    - `omit_backingstore`:
        - if `False`, creates a `<backingStore/>` entry for COW-images
        - if `True`, does not create a `<backingStore/>` entry for COW-images (might work around a bug with older libvirt)
        - if `None`, tries to autodetect whether the `<backingStore/>` should be omitted
    """

    if quiet:
        libvirt.registerErrorHandler((lambda ctx, err: 1), None)

    libvirt_open = libvirt.open
    if startmode == START_PRINTXML:
        libvirt.openReadOnly

    with libvirt_open(libvirtURI) as conn:
        if omit_backingstore is None:
            omit_backingstore = conn.getLibVersion() < 9010000

        dom = Domain.fromLibVirt(conn, srcname)
        if not dom:
            raise KeyError(f"Couldn't find original VM {srcname!r}")

        if not dstname:
            suffix = 0
            while True:
                if suffix:
                    name = f"{srcname}-clone-{suffix}"
                else:
                    name = f"{srcname}-clone"
                if not Domain.fromLibVirt(conn, name):
                    dstname = name
                    break
                suffix += 1

        if Domain.fromLibVirt(conn, dstname):
            raise KeyError(f"cloned VM {dstname!r} already exists")

        dom.anonymize(mac_address=mac)
        dom.changeName(dstname)

        srcdisks = dom.getClonableDisks()

        log.debug(f"cloning disks {srcdisks} to {outdir!r}")
        cloneddisks = cloneDisks(srcdisks, outdir)
        log.debug(f"cloned disks {cloneddisks}")

        backing_files = {}
        if not omit_backingstore:
            backing_files = {
                dev: qcow.getBackingChain(disk) for dev, disk in srcdisks.items()
            }
        dom.changeDiskFiles(cloneddisks, backing_files=backing_files)

        if startmode == START_PRINTXML:
            print(dom)
        elif startmode == START_EPHEMERAL:
            log.debug(f"start ephemeral VM {dstname!r}")
            conn.createXML(dom.toXML())
            for d in cloneddisks.values():
                log.debug(f"deleting ephemeral volume {d!r}")
                os.unlink(d)
        else:
            log.debug(f"define cloned VM {dstname!r}")
            vm = conn.defineXML(dom.toXML())
            if startmode == START_PERSISTENT:
                log.debug(f"starting persistent VM {dstname!r}")
                vm.create()

cloneDisks(disks, outputdir)

shallow-clones the given disks into output dir.

disks is a dictionary that maps IDs to disk paths. a similar map from IDs to cloned paths is returned

Source code in src/sardinecake/clone.py
def cloneDisks(disks: dict[str, str], outputdir: str | None) -> dict[str, str]:
    """shallow-clones the given disks into output dir.

    `disks` is a dictionary that maps IDs to disk paths.
    a similar map from IDs to cloned paths is returned
    """
    return {k: cloneQCOW2(v, outputdir) for k, v in disks.items()}

cloneQCOW2(source, target=None)

copy-on-write clone of a QCOW2 file source to another QCOW2 file.

  • if target is None, the clone will be in the same directory as the source, and the name will be derived from the source basename.
  • if target is a directory (or ends with "/"), the clone will be in the given directory, and the name will be derived from the source basename.
  • otherwise, target is a the prospective name of the output file.

cloneQCOW2() will refuse to overwrite existing images, and use a unique filename when required. the actual filename of the cloned image is returned

Source code in src/sardinecake/clone.py
def cloneQCOW2(source: str, target: str | None = None) -> str:
    """copy-on-write clone of a QCOW2 file `source` to another QCOW2 file.

    - if target is `None`, the clone will be in the same directory as the source, and the name will be derived from the source basename.
    - if target is a directory (or ends with "/"), the clone will be in the given directory, and the name will be derived from the source basename.
    - otherwise, target is a the prospective name of the output file.

    `cloneQCOW2()` will refuse to overwrite existing images, and use a unique filename when required.
    the actual filename of the cloned image is returned
    """
    import subprocess

    # qemu-img create -f qcow2 -b base.qcow2 -F qcow2 clone.qcow2

    # check if source exists and can be opened (otherwise raise a standard error)
    with open(source) as f:
        pass

    if not target:
        target = source
    elif os.path.isdir(target) or not os.path.basename(target):
        target = os.path.join(target, os.path.basename(source))

    # ensure that output directory exists
    outdir = os.path.dirname(target)
    os.makedirs(outdir, exist_ok=True)

    base, ext = os.path.splitext(target)

    i = ""
    while True:
        target = f"{base}{i}{ext}"
        try:
            targetfd = open(target, "x")
            break
        except FileExistsError:
            pass
        if not i:
            i = 0
        i -= 1

    targetfd.close()

    log.debug(f"shallow-cloning {source!r} to {target!r}")

    q = qcow.QCOW(target)
    q.create(backing_file=source)

    if not q:
        return

    return target

fetch_and_clone(container, vm, ocidir, vmdir, clonedir=None, startmode=START_DEFINE, libvirtURI=None)

fetches a 'container', imports it and creates an ephemeral clone.

  • returns the configuration of the VM (as known).
  • raises exceptions on failure
Source code in src/sardinecake/clone.py
def fetch_and_clone(
    container: str,
    vm: str,
    ocidir: str,
    vmdir: str,
    clonedir: str | None = None,
    startmode: int = START_DEFINE,
    libvirtURI: str | None = None,
):
    """fetches a 'container', imports it and creates an ephemeral clone.

    - returns the configuration of the VM (as known).
    - raises exceptions on failure
    """
    from .ociregistry import fetch
    from .importvm import importvm, getconfig

    if ocidir is None:
        oci = container
    else:
        oci = os.path.join(ocidir or "", container)

    clonedir = clonedir or vmdir

    quiet = (log.getEffectiveLevel() >= logging.INFO,)

    log.info(f"fetching {container!r}")
    if not fetch(container, ocidir=ocidir):
        raise RuntimeError(f"failed to fetch {container!r}")

    log.info(f"importing {container!r}")
    basevm = importvm(
        ocidir=oci,
        outdir=vmdir,
        name=None,
        libvirtURI=libvirtURI,
        quiet=quiet,
    )
    if not basevm:
        raise RuntimeError(f"failed to import {container}")

    config = getconfig(ocidir=oci) or {}
    config["name"] = vm

    log.info(f"cloning {container!r} as {vm}")
    clone(
        srcname=basevm[0],
        dstname=vm,
        startmode=startmode,
        libvirtURI=libvirtURI,
        outdir=clonedir,
        quiet=quiet,
        omit_backingstore=None,
    )

    return config