Cert chain verification
This commit is contained in:
46
fetcher.py
46
fetcher.py
@@ -1,12 +1,17 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import codecs
|
||||||
import json
|
import json
|
||||||
import hashlib
|
import hashlib
|
||||||
from OpenSSL import crypto
|
import os
|
||||||
|
import shutil
|
||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from OpenSSL import crypto
|
||||||
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='iconograph fetcher')
|
parser = argparse.ArgumentParser(description='iconograph fetcher')
|
||||||
@@ -35,17 +40,44 @@ class Fetcher(object):
|
|||||||
def __init__(self, base_url, image_type, ca_cert):
|
def __init__(self, base_url, image_type, ca_cert):
|
||||||
self._base_url = base_url
|
self._base_url = base_url
|
||||||
self._image_type = image_type
|
self._image_type = image_type
|
||||||
with open(ca_cert, 'r') as fh:
|
self._ca_cert_path = ca_cert
|
||||||
self._ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, fh.read())
|
|
||||||
|
def _VerifyChain(self, untrusted_certs, cert):
|
||||||
|
tempdir = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
try:
|
||||||
|
untrusted_path = os.path.join(tempdir, 'untrusted.pem')
|
||||||
|
with open(untrusted_path, 'w') as fh:
|
||||||
|
for cert_str in untrusted_certs:
|
||||||
|
fh.write(cert_str)
|
||||||
|
|
||||||
|
cert_path = os.path.join(tempdir, 'cert.pem')
|
||||||
|
with open(cert_path, 'w') as fh:
|
||||||
|
fh.write(cert)
|
||||||
|
|
||||||
|
# Rely on pipe buffering to eat the stdout junk
|
||||||
|
subprocess.check_call([
|
||||||
|
'openssl', 'verify',
|
||||||
|
'-CAfile', self._ca_cert_path,
|
||||||
|
'-untrusted', untrusted_path,
|
||||||
|
cert_path,
|
||||||
|
], stdout=subprocess.PIPE)
|
||||||
|
finally:
|
||||||
|
shutil.rmtree(tempdir)
|
||||||
|
|
||||||
def _Unwrap(self, wrapped):
|
def _Unwrap(self, wrapped):
|
||||||
|
self._VerifyChain(wrapped.get('other_certs', []), wrapped['cert'])
|
||||||
|
|
||||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, wrapped['cert'])
|
cert = crypto.load_certificate(crypto.FILETYPE_PEM, wrapped['cert'])
|
||||||
|
sig = codecs.decode(wrapped['sig'], 'hex')
|
||||||
crypto.verify(
|
crypto.verify(
|
||||||
cert,
|
cert,
|
||||||
wrapped['sig'].encode('ascii'),
|
sig,
|
||||||
wrapped['inner'].encode('ascii'),
|
wrapped['inner'].encode('utf8'),
|
||||||
'sha256')
|
'sha256')
|
||||||
|
|
||||||
|
return json.loads(wrapped['inner'])
|
||||||
|
|
||||||
def _GetManifest(self):
|
def _GetManifest(self):
|
||||||
url = '%s/%s.manifest.json' % (self._base_url, self._image_type)
|
url = '%s/%s.manifest.json' % (self._base_url, self._image_type)
|
||||||
resp = urllib.request.urlopen(url).read().decode('utf8')
|
resp = urllib.request.urlopen(url).read().decode('utf8')
|
||||||
@@ -63,8 +95,8 @@ class Fetcher(object):
|
|||||||
|
|
||||||
def Fetch(self):
|
def Fetch(self):
|
||||||
manifest = self._GetManifest()
|
manifest = self._GetManifest()
|
||||||
#image = self._ChooseImage(manifest)
|
image = self._ChooseImage(manifest)
|
||||||
#print(image)
|
print(image)
|
||||||
|
|
||||||
|
|
||||||
fetcher = Fetcher(FLAGS.base_url, FLAGS.image_type, FLAGS.ca_cert)
|
fetcher = Fetcher(FLAGS.base_url, FLAGS.image_type, FLAGS.ca_cert)
|
||||||
|
|||||||
Reference in New Issue
Block a user