API Reference
fetch(container, ocidir='', sourcecontainer=None)
fetch a container from a registry
Source code in src/sardinecake/ociregistry.py
def fetch(container: str, ocidir: str = "", sourcecontainer: str | None = None) -> bool:
"""fetch a container from a registry"""
srccontainer = sourcecontainer or container
contain, tag = utils.split_imagetag(container)
if not issubclass(type(tag), str):
container = contain
dstcontainer = os.path.join(ocidir or "", container)
dstcontainerdir = os.path.join(ocidir or "", contain)
# skopeo copy docker://ghcr.io/cirruslabs/macos-monterey-vanilla:latest oci:monterey:latest
os.makedirs(dstcontainerdir, exist_ok=True)
cmd = ["skopeo", "copy", f"docker://{srccontainer}", f"oci:{dstcontainer}"]
log.debug(cmd)
args = {}
if log.getEffectiveLevel() >= logging.INFO:
args["stdout"] = subprocess.DEVNULL
args["stderr"] = subprocess.DEVNULL
p = subprocess.run(cmd, **args)
if p.returncode:
raise SystemExit(p.returncode)
return True
login(registry, username=None, password=None)
login to a registry
Source code in src/sardinecake/auth.py
def login(
registry: str, username: str | None = None, password: str | None = None
) -> bool:
"""login to a registry"""
cmd = ["skopeo", "login"]
if username:
cmd += ["--username", username]
if password:
cmd += ["--password-stdin"]
cmd += [registry]
if password:
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
p.communicate(password.encode())
else:
p = subprocess.run(cmd)
return not p.returncode
logout(registry)
logout from a registry
if
Source code in src/sardinecake/auth.py
def logout(registry: str | None):
"""logout from a registry
if <registry> is 'None', logout from *all* registries
"""
cmd = ["skopeo", "logout"]
if registry is None:
cmd.append("--all")
else:
cmd.append(registry)
p = subprocess.run(cmd)
return not p.returncode
push(container, ocidir='', targetcontainer=None)
push a container to a registry
Source code in src/sardinecake/ociregistry.py
def push(container: str, ocidir: str = "", targetcontainer: str | None = None) -> bool:
"""push a container to a registry"""
srccontainer = os.path.join(ocidir or "", container)
dstcontainer = targetcontainer or container
cmd = ["skopeo", "copy"]
cmd += [f"oci:{srccontainer}", f"docker://{dstcontainer}"]
log.debug(cmd)
p = subprocess.run(cmd)
if p.returncode:
raise SystemExit(p.returncode)
return True
clone(srcname, dstname, startmode=START_DEFINE, libvirtURI=None, mac=None, outdir=None, quiet=False, omit_backingstore=None)
do a fast clone of a VM
srcname: original VMdstname: VM to be createdstartmode: how to start the newly created VMlibvirtURI: connection URI to the libvirt server (note that only local QCOW2 disks can be cloned)mac: MAC-address of cloned network interface (use None for a new random MAC)outdir: place for the shallow cloned disks images (use None to use the same directory as the input disk images)quiet: ifTrue, suppress libvirt errorsomit_backingstore:- if
False, creates a<backingStore/>entry for COW-images - if
True, does not create a<backingStore/>entry for COW-images (might work around a bug with older libvirt) - if
None, tries to autodetect whether the<backingStore/>should be omitted
- if
Source code in src/sardinecake/clone.py
def clone(
srcname: str,
dstname: str,
startmode: int = START_DEFINE,
libvirtURI: str | None = None,
mac: str | None = None,
outdir: str | None = None,
quiet: bool = False,
omit_backingstore: bool | None = None,
):
"""do a fast clone of a VM
- `srcname`: original VM
- `dstname`: VM to be created
- `startmode`: how to start the newly created VM
- `libvirtURI`: connection URI to the libvirt server
(note that only local QCOW2 disks can be cloned)
- `mac`: MAC-address of cloned network interface (use None for a new random MAC)
- `outdir`: place for the shallow cloned disks images (use None to use the same directory as the input disk images)
- `quiet`: if `True`, suppress libvirt errors
- `omit_backingstore`:
- if `False`, creates a `<backingStore/>` entry for COW-images
- if `True`, does not create a `<backingStore/>` entry for COW-images (might work around a bug with older libvirt)
- if `None`, tries to autodetect whether the `<backingStore/>` should be omitted
"""
if quiet:
libvirt.registerErrorHandler((lambda ctx, err: 1), None)
libvirt_open = libvirt.open
if startmode == START_PRINTXML:
libvirt.openReadOnly
with libvirt_open(libvirtURI) as conn:
if omit_backingstore is None:
omit_backingstore = conn.getLibVersion() < 9010000
dom = Domain.fromLibVirt(conn, srcname)
if not dom:
raise KeyError(f"Couldn't find original VM {srcname!r}")
if not dstname:
suffix = 0
while True:
if suffix:
name = f"{srcname}-clone-{suffix}"
else:
name = f"{srcname}-clone"
if not Domain.fromLibVirt(conn, name):
dstname = name
break
suffix += 1
if Domain.fromLibVirt(conn, dstname):
raise KeyError(f"cloned VM {dstname!r} already exists")
dom.anonymize(mac_address=mac)
dom.changeName(dstname)
srcdisks = dom.getClonableDisks()
log.debug(f"cloning disks {srcdisks} to {outdir!r}")
cloneddisks = cloneDisks(srcdisks, outdir)
log.debug(f"cloned disks {cloneddisks}")
backing_files = {}
if not omit_backingstore:
backing_files = {
dev: qcow.getBackingChain(disk) for dev, disk in srcdisks.items()
}
dom.changeDiskFiles(cloneddisks, backing_files=backing_files)
if startmode == START_PRINTXML:
print(dom)
elif startmode == START_EPHEMERAL:
log.debug(f"start ephemeral VM {dstname!r}")
conn.createXML(dom.toXML())
for d in cloneddisks.values():
log.debug(f"deleting ephemeral volume {d!r}")
os.unlink(d)
else:
log.debug(f"define cloned VM {dstname!r}")
vm = conn.defineXML(dom.toXML())
if startmode == START_PERSISTENT:
log.debug(f"starting persistent VM {dstname!r}")
vm.create()
cloneDisks(disks, outputdir)
shallow-clones the given disks into output dir.
disks is a dictionary that maps IDs to disk paths.
a similar map from IDs to cloned paths is returned
Source code in src/sardinecake/clone.py
def cloneDisks(disks: dict[str, str], outputdir: str | None) -> dict[str, str]:
"""shallow-clones the given disks into output dir.
`disks` is a dictionary that maps IDs to disk paths.
a similar map from IDs to cloned paths is returned
"""
return {k: cloneQCOW2(v, outputdir) for k, v in disks.items()}
cloneQCOW2(source, target=None)
copy-on-write clone of a QCOW2 file source to another QCOW2 file.
- if target is
None, the clone will be in the same directory as the source, and the name will be derived from the source basename. - if target is a directory (or ends with "/"), the clone will be in the given directory, and the name will be derived from the source basename.
- otherwise, target is a the prospective name of the output file.
cloneQCOW2() will refuse to overwrite existing images, and use a unique filename when required.
the actual filename of the cloned image is returned
Source code in src/sardinecake/clone.py
def cloneQCOW2(source: str, target: str | None = None) -> str:
"""copy-on-write clone of a QCOW2 file `source` to another QCOW2 file.
- if target is `None`, the clone will be in the same directory as the source, and the name will be derived from the source basename.
- if target is a directory (or ends with "/"), the clone will be in the given directory, and the name will be derived from the source basename.
- otherwise, target is a the prospective name of the output file.
`cloneQCOW2()` will refuse to overwrite existing images, and use a unique filename when required.
the actual filename of the cloned image is returned
"""
import subprocess
# qemu-img create -f qcow2 -b base.qcow2 -F qcow2 clone.qcow2
# check if source exists and can be opened (otherwise raise a standard error)
with open(source) as f:
pass
if not target:
target = source
elif os.path.isdir(target) or not os.path.basename(target):
target = os.path.join(target, os.path.basename(source))
# ensure that output directory exists
outdir = os.path.dirname(target)
os.makedirs(outdir, exist_ok=True)
base, ext = os.path.splitext(target)
i = ""
while True:
target = f"{base}{i}{ext}"
try:
targetfd = open(target, "x")
break
except FileExistsError:
pass
if not i:
i = 0
i -= 1
targetfd.close()
log.debug(f"shallow-cloning {source!r} to {target!r}")
q = qcow.QCOW(target)
q.create(backing_file=source)
if not q:
return
return target
fetch_and_clone(container, vm, ocidir, vmdir, clonedir=None, startmode=START_DEFINE, libvirtURI=None)
fetches a 'container', imports it and creates an ephemeral clone.
- returns the configuration of the VM (as known).
- raises exceptions on failure
Source code in src/sardinecake/clone.py
def fetch_and_clone(
container: str,
vm: str,
ocidir: str,
vmdir: str,
clonedir: str | None = None,
startmode: int = START_DEFINE,
libvirtURI: str | None = None,
):
"""fetches a 'container', imports it and creates an ephemeral clone.
- returns the configuration of the VM (as known).
- raises exceptions on failure
"""
from .ociregistry import fetch
from .importvm import importvm, getconfig
if ocidir is None:
oci = container
else:
oci = os.path.join(ocidir or "", container)
clonedir = clonedir or vmdir
quiet = (log.getEffectiveLevel() >= logging.INFO,)
log.info(f"fetching {container!r}")
if not fetch(container, ocidir=ocidir):
raise RuntimeError(f"failed to fetch {container!r}")
log.info(f"importing {container!r}")
basevm = importvm(
ocidir=oci,
outdir=vmdir,
name=None,
libvirtURI=libvirtURI,
quiet=quiet,
)
if not basevm:
raise RuntimeError(f"failed to import {container}")
config = getconfig(ocidir=oci) or {}
config["name"] = vm
log.info(f"cloning {container!r} as {vm}")
clone(
srcname=basevm[0],
dstname=vm,
startmode=startmode,
libvirtURI=libvirtURI,
outdir=clonedir,
quiet=quiet,
omit_backingstore=None,
)
return config