Skip to content

Commit

Permalink
Merge pull request #62 from bgilbert/live2
Browse files Browse the repository at this point in the history
Add tool to embed Ignition config in live ISO image
  • Loading branch information
bgilbert authored Sep 19, 2019
2 parents f3d194b + 11e2b9a commit 0bc740b
Showing 1 changed file with 197 additions and 0 deletions.
197 changes: 197 additions & 0 deletions coreos-iso-embed-ignition
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#!/usr/bin/python3
#
# coreos-iso-embed-ignition - embed an Ignition config in a CoreOS live ISO
# Copyright (C) 2019 Red Hat
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import argparse
import gzip
import os
import shutil
import struct
import subprocess
import sys
import tempfile

FILENAME = 'config.ign'

class EmbedException(Exception):
pass


class CopiedFileHolder:
'''
Holder for a file handle which can optionally be copied to another one,
then modified, then finalized. If copied but not finalized, the copy
will be deleted when GCed.
'''
def __init__(self, fh):
self.fh = fh
self._copied = False
self._complete = False

def __del__(self):
if self._copied and not self._complete:
os.remove(self.fh.name)

def copy_to(self, target):
assert not self._copied
assert not self._complete
src = self.fh
self.fh = target
self._copied = True
shutil.copyfileobj(src, target)

def finish(self):
self._complete = True


def make_cpio(ignition_fh):
'''
Make a gzipped CPIO archive containing the specified Ignition config.
'''
with tempfile.TemporaryDirectory(prefix='coreos-iso-') as tempdir:
config_path = os.path.join(tempdir, FILENAME)
with open(config_path, 'w') as out_fh:
shutil.copyfileobj(ignition_fh, out_fh)
proc = subprocess.Popen(['cpio', '-o', '-D', tempdir, '-H', 'newc',
'--quiet'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
archive, _ = proc.communicate(FILENAME.encode())
if proc.returncode != 0:
raise EmbedException(f'cpio -o exited with status {proc.returncode}.')
return gzip.compress(archive)


def extract_cpio(buf):
'''
Extract a gzipped CPIO archive and return the contents of the Ignition
config.
'''
cpio = gzip.decompress(buf)
proc = subprocess.Popen(['cpio', '-i', '--quiet', '--to-stdout',
FILENAME], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
config, _ = proc.communicate(cpio)
if proc.returncode != 0:
raise EmbedException(f'cpio -i exited with status {proc.returncode}.')
return config


def get_location(iso_fh):
'''
Given an ISO image, return the offset and length of the Ignition
embed area.
'''
fmt = '<8s2Q'
size = struct.calcsize(fmt)
# Read the last bytes of the ISO 9660 System Area
iso_fh.seek(32768 - size)
buf = iso_fh.read(size)
if len(buf) < size:
raise EmbedException('Short ISO image file.')
magic, offset, length = struct.unpack(fmt, buf)
if magic != b'coreiso+':
raise EmbedException('Unrecognized CoreOS ISO image.')
if os.fstat(iso_fh.fileno()).st_size < offset + length:
raise EmbedException('Invalid CoreOS ISO image.')
return offset, length


def parse_args():
parser = argparse.ArgumentParser(description="Embed an Ignition config in a CoreOS live ISO image.")
subparsers = parser.add_subparsers(dest='command', title='subcommands',
required=True,
description='Use <subcommand> -h for more info.')

embed = subparsers.add_parser('embed',
description='Embed an Ignition config in an ISO image.',
help='Embed an Ignition config in an ISO image')
embed.add_argument('-c', '--config', metavar='PATH', default='-',
type=argparse.FileType('r'),
help='Ignition config to embed [default: stdin]')
embed.add_argument('-f', '--force', action='store_true',
help='Overwrite an existing embedded Ignition config')
embed.add_argument('-o', '--output', metavar='PATH',
type=argparse.FileType('x+b'),
help='Copy to a new file, instead of modifying in place')
embed.set_defaults(command=do_embed)

show = subparsers.add_parser('show',
description='Show the embedded Ignition config from an ISO image.',
help='Show the embedded Ignition config from an ISO image')
show.set_defaults(command=do_show)

remove = subparsers.add_parser('remove',
description='Remove an existing embedded Ignition config from an ISO image.',
help='Remove an existing embedded Ignition config')
remove.add_argument('-o', '--output', metavar='PATH',
type=argparse.FileType('x+b'),
help='Copy to a new file, instead of modifying in place')
remove.set_defaults(command=do_remove)

for subp in embed, show, remove:
subp.add_argument('image', metavar='ISO',
type=argparse.FileType('r+b'),
help='ISO image')

return parser.parse_args()


def do_embed(args, fh, offset, length):
cpio = make_cpio(args.config)
if len(cpio) > length:
raise EmbedException('Compressed Ignition config is too large: {} > {}'.format(len(cpio), length))
if not args.force:
# Ensure all zero bytes
fh.seek(offset)
if fh.read(length) != bytes(length):
raise EmbedException('This ISO image already has an embedded Ignition config; use -f to force.')
fh.seek(offset)
fh.write(cpio)
# Zero out the tail in case we're overwriting an existing config
fh.write(bytes(length - len(cpio)))


def do_show(args, fh, offset, length):
fh.seek(offset)
buf = fh.read(length)
if len(buf) < length:
raise EmbedException("Couldn't read embedded Ignition config.")
if buf == bytes(length):
raise EmbedException("No embedded Ignition config.")
config = extract_cpio(buf)
print(config.decode(), end='')


def do_remove(args, fh, offset, length):
fh.seek(offset)
fh.write(bytes(length))


def main():
args = parse_args()
image = CopiedFileHolder(args.image)
if getattr(args, 'output', None) is not None:
image.copy_to(args.output)
offset, length = get_location(image.fh)
args.command(args, image.fh, offset, length)
image.finish()


if __name__ == '__main__':
try:
main()
except EmbedException as e:
print(e, file=sys.stderr)
sys.exit(1)

0 comments on commit 0bc740b

Please sign in to comment.