Files
iconograph/fetcher.py

104 lines
2.6 KiB
Python
Raw Normal View History

2016-03-28 22:06:04 -07:00
#!/usr/bin/python3
import argparse
2016-03-29 17:43:21 -07:00
import codecs
2016-03-28 22:06:04 -07:00
import json
import hashlib
2016-03-29 17:43:21 -07:00
import os
import shutil
2016-03-28 22:06:04 -07:00
import socket
import struct
2016-03-29 17:43:21 -07:00
import subprocess
import tempfile
2016-03-28 22:06:04 -07:00
import urllib.request
2016-03-29 17:43:21 -07:00
from OpenSSL import crypto
2016-03-28 22:06:04 -07:00
parser = argparse.ArgumentParser(description='iconograph fetcher')
parser.add_argument(
2016-03-29 10:24:22 -07:00
'--base-url',
dest='base_url',
action='store',
required=True)
2016-03-29 11:27:10 -07:00
parser.add_argument(
'--ca-cert',
dest='ca_cert',
action='store',
required=True)
2016-03-28 22:06:04 -07:00
parser.add_argument(
2016-03-29 10:24:22 -07:00
'--image-type',
dest='image_type',
action='store',
required=True)
2016-03-28 22:06:04 -07:00
FLAGS = parser.parse_args()
class Fetcher(object):
_MAX_BP = 10000
2016-03-29 11:27:10 -07:00
def __init__(self, base_url, image_type, ca_cert):
2016-03-28 22:06:04 -07:00
self._base_url = base_url
self._image_type = image_type
2016-03-29 17:43:21 -07:00
self._ca_cert_path = ca_cert
def _VerifyChain(self, untrusted_certs, cert):
tempdir = tempfile.mkdtemp()
try:
untrusted_path = os.path.join(tempdir, 'untrusted.pem')
with open(untrusted_path, 'w') as fh:
for cert_str in untrusted_certs:
fh.write(cert_str)
cert_path = os.path.join(tempdir, 'cert.pem')
with open(cert_path, 'w') as fh:
fh.write(cert)
# Rely on pipe buffering to eat the stdout junk
subprocess.check_call([
'openssl', 'verify',
'-CAfile', self._ca_cert_path,
'-untrusted', untrusted_path,
cert_path,
], stdout=subprocess.PIPE)
finally:
shutil.rmtree(tempdir)
2016-03-29 11:27:10 -07:00
def _Unwrap(self, wrapped):
2016-03-29 17:43:21 -07:00
self._VerifyChain(wrapped.get('other_certs', []), wrapped['cert'])
2016-03-29 11:27:10 -07:00
cert = crypto.load_certificate(crypto.FILETYPE_PEM, wrapped['cert'])
2016-03-29 17:43:21 -07:00
sig = codecs.decode(wrapped['sig'], 'hex')
2016-03-29 11:27:10 -07:00
crypto.verify(
cert,
2016-03-29 17:43:21 -07:00
sig,
wrapped['inner'].encode('utf8'),
2016-03-29 11:27:10 -07:00
'sha256')
2016-03-28 22:06:04 -07:00
2016-03-29 17:43:21 -07:00
return json.loads(wrapped['inner'])
2016-03-28 22:06:04 -07:00
def _GetManifest(self):
url = '%s/%s.manifest.json' % (self._base_url, self._image_type)
2016-03-29 11:27:10 -07:00
resp = urllib.request.urlopen(url).read().decode('utf8')
return self._Unwrap(json.loads(resp))
2016-03-28 22:06:04 -07:00
def _ChooseImage(self, manifest):
hostname = socket.gethostname()
hash_base = hashlib.sha256(hostname.encode('ascii'))
for image in manifest:
hashobj = hash_base.copy()
hashobj.update(struct.pack('!L', image['timestamp']))
my_bp = struct.unpack('!I', hashobj.digest()[-4:])[0] % self._MAX_BP
if my_bp < image['rollout_‱']:
return image
def Fetch(self):
manifest = self._GetManifest()
2016-03-29 17:43:21 -07:00
image = self._ChooseImage(manifest)
print(image)
2016-03-28 22:06:04 -07:00
2016-03-29 11:27:10 -07:00
fetcher = Fetcher(FLAGS.base_url, FLAGS.image_type, FLAGS.ca_cert)
2016-03-28 22:06:04 -07:00
fetcher.Fetch()