2021-03-24 17:26:38 +01:00
|
|
|
"""Functions that help us generate and use info.json files.
|
|
|
|
"""
|
|
|
|
import json
|
|
|
|
from collections.abc import Mapping
|
2021-08-17 00:33:30 +02:00
|
|
|
from functools import lru_cache
|
2021-03-24 17:26:38 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
import hjson
|
|
|
|
import jsonschema
|
|
|
|
from milc import cli
|
|
|
|
|
|
|
|
|
2022-11-09 16:50:43 +01:00
|
|
|
def _dict_raise_on_duplicates(ordered_pairs):
|
|
|
|
"""Reject duplicate keys."""
|
|
|
|
d = {}
|
|
|
|
for k, v in ordered_pairs:
|
|
|
|
if k in d:
|
|
|
|
raise ValueError("duplicate key: %r" % (k,))
|
|
|
|
else:
|
|
|
|
d[k] = v
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
|
|
|
def json_load(json_file, strict=True):
|
2021-03-24 17:26:38 +01:00
|
|
|
"""Load a json file from disk.
|
|
|
|
|
|
|
|
Note: file must be a Path object.
|
|
|
|
"""
|
|
|
|
try:
|
2022-02-28 21:02:39 +01:00
|
|
|
# Get the IO Stream for Path objects
|
|
|
|
# Not necessary if the data is provided via stdin
|
|
|
|
if isinstance(json_file, Path):
|
|
|
|
json_file = json_file.open(encoding='utf-8')
|
2022-11-09 16:50:43 +01:00
|
|
|
return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
|
2021-03-24 17:26:38 +01:00
|
|
|
|
2021-08-09 17:27:02 +02:00
|
|
|
except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
|
2021-03-24 17:26:38 +01:00
|
|
|
cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
|
|
|
|
exit(1)
|
2021-08-09 17:27:02 +02:00
|
|
|
except Exception as e:
|
|
|
|
cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
|
|
|
|
exit(1)
|
2021-03-24 17:26:38 +01:00
|
|
|
|
|
|
|
|
2021-08-17 00:33:30 +02:00
|
|
|
@lru_cache(maxsize=0)
|
2021-03-24 17:26:38 +01:00
|
|
|
def load_jsonschema(schema_name):
|
|
|
|
"""Read a jsonschema file from disk.
|
|
|
|
"""
|
2021-06-25 05:48:53 +02:00
|
|
|
if Path(schema_name).exists():
|
|
|
|
return json_load(schema_name)
|
|
|
|
|
2021-03-24 17:26:38 +01:00
|
|
|
schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
|
|
|
|
|
|
|
|
if not schema_path.exists():
|
|
|
|
schema_path = Path('data/schemas/false.jsonschema')
|
|
|
|
|
|
|
|
return json_load(schema_path)
|
|
|
|
|
|
|
|
|
2021-08-17 00:33:30 +02:00
|
|
|
@lru_cache(maxsize=0)
|
|
|
|
def compile_schema_store():
|
|
|
|
"""Compile all our schemas into a schema store.
|
2021-03-24 17:26:38 +01:00
|
|
|
"""
|
2021-06-25 05:48:53 +02:00
|
|
|
schema_store = {}
|
2021-03-24 17:26:38 +01:00
|
|
|
|
2021-06-25 05:48:53 +02:00
|
|
|
for schema_file in Path('data/schemas').glob('*.jsonschema'):
|
|
|
|
schema_data = load_jsonschema(schema_file)
|
|
|
|
if not isinstance(schema_data, dict):
|
|
|
|
cli.log.debug('Skipping schema file %s', schema_file)
|
|
|
|
continue
|
|
|
|
schema_store[schema_data['$id']] = schema_data
|
|
|
|
|
2021-08-17 00:33:30 +02:00
|
|
|
return schema_store
|
|
|
|
|
|
|
|
|
|
|
|
@lru_cache(maxsize=0)
|
|
|
|
def create_validator(schema):
|
|
|
|
"""Creates a validator for the given schema id.
|
|
|
|
"""
|
|
|
|
schema_store = compile_schema_store()
|
2022-02-28 21:02:39 +01:00
|
|
|
resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
|
2021-06-25 05:48:53 +02:00
|
|
|
|
2022-06-18 07:30:46 +02:00
|
|
|
return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
|
2021-03-24 17:26:38 +01:00
|
|
|
|
|
|
|
|
2021-06-25 05:48:53 +02:00
|
|
|
def validate(data, schema):
|
|
|
|
"""Validates data against a schema.
|
2021-03-24 17:26:38 +01:00
|
|
|
"""
|
2021-06-25 05:48:53 +02:00
|
|
|
validator = create_validator(schema)
|
2021-03-24 17:26:38 +01:00
|
|
|
|
|
|
|
return validator(data)
|
|
|
|
|
|
|
|
|
|
|
|
def deep_update(origdict, newdict):
|
2021-06-25 05:48:53 +02:00
|
|
|
"""Update a dictionary in place, recursing to do a depth-first deep copy.
|
2021-03-24 17:26:38 +01:00
|
|
|
"""
|
|
|
|
for key, value in newdict.items():
|
|
|
|
if isinstance(value, Mapping):
|
|
|
|
origdict[key] = deep_update(origdict.get(key, {}), value)
|
|
|
|
|
|
|
|
else:
|
|
|
|
origdict[key] = value
|
|
|
|
|
|
|
|
return origdict
|