#
# Copyright (C) 2011 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public
# License as published by the Free Software Foundation; either version
# 2 of the License (GPLv2) or (at your option) any later version.
# There is NO WARRANTY for this software, express or implied,
# including the implied warranties of MERCHANTABILITY,
# NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should
# have received a copy of GPLv2 along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
"""Simple validation for JSON compatible data structures"""
import re
import copy
import json
[docs]class ValidationError(Exception): pass
[docs]def fail_validation(fmt, *args, **kwds):
raise ValidationError(str(fmt).format(*args, **kwds))
[docs]def check_value(allowed_values, allow_none=False):
def validator(value, setting='setting'):
if allow_none and value is None:
return
if value not in allowed_values:
fail_validation("Expected one of {0!r} for {1}, got {2!r}",
allowed_values, setting, value)
return validator
[docs]def check_type(expected_type, allow_none=False):
def validator(value, setting='setting'):
if allow_none and value is None:
return
if not isinstance(value, expected_type):
fail_validation("Expected {0!r} for {1}, got {2!r}",
expected_type, setting, type(value))
return validator
[docs]def check_text(allow_none=False):
# Allow either string type for now
# TODO: Tighten this up to enforce unicode
# Means fixing deserialisation interfaces :P
return check_type(basestring, allow_none)
[docs]def check_regex(pattern, expected=None, allow_none=False):
_validate_text = check_text()
if expected is None:
expected = "text matching {0!r}".format(pattern)
err_msg = "Expected {0} for {{0}}, got {{1!r}}".format(expected)
def validator(value, setting='setting'):
if allow_none and value is None:
return
_validate_text(value, setting)
# We use Unicode storage, but stick with the ASCII rules
# for pattern matching on whitespace etc.
if re.match(pattern, value) is None:
fail_validation(err_msg, setting, value)
return validator
SIMPLE_ID_REGEX = r'^[\w\-]+$'
[docs]def check_simple_id(expected='simple ID (alphanumeric, underscores, hyphens)', allow_none=False):
return check_regex(SIMPLE_ID_REGEX, expected, allow_none)
PULP_ID_REGEX = r'^[_A-Za-z]+$'
[docs]def check_pulp_id(expected='valid Pulp ID', allow_none=False):
return check_regex(PULP_ID_REGEX, expected, allow_none)
VALID_FILTER_REGEX = r'^[][*?@%+=:,./~_\w\-]+$'
[docs]def check_rsync_filter(allow_none=False):
return check_regex(VALID_FILTER_REGEX, 'valid rsync filter', allow_none)
[docs]def check_rsync_filter_sequence():
return check_sequence(check_rsync_filter())
# We seriously need some better URL handling infrastructure in the stdlib...
# From http://stackoverflow.com/questions/106179/regular-expression-to-match-hostname-or-ip-address
IPv4_REGEX = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";
# TODO: Allow IPv6 addresses as well (for now: just use hostnames if you need to access an IPv6-only host)
HOSTNAME_REGEX = "^(([a-zA-Z]|[a-zA-Z][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z]|[A-Za-z][A-Za-z0-9\-]*[A-Za-z0-9])$";
VALID_HOST_REGEX = "({0})|({1})".format(IPv4_REGEX, HOSTNAME_REGEX)
[docs]def check_host(allow_none=False):
return check_regex(VALID_HOST_REGEX, 'valid host', allow_none)
VALID_PATH_REGEX = r'^[\w@%+=:,./-]*$'
[docs]def check_path(allow_none=False):
return check_regex(VALID_PATH_REGEX, 'valid filesystem path', allow_none)
[docs]def check_remote_path(allow_none=False):
_validate_path = check_path()
def validator(value, setting='setting'):
if allow_none and value is None:
return
_validate_path(value, setting)
if not value.startswith('/') or not value.endswith('/'):
fail_validation("{0!r} must start and end with '/' "
"characters, got {1!r}",
setting, value)
return validator
[docs]def check_sequence(item_validator, allow_none=False):
def validator(value, setting='setting'):
if allow_none and value is None:
return
# Check we've been given a sequence
if isinstance(value, basestring):
fail_validation("Strings not accepted for {0!r}, got {1!r}",
setting, value)
if hasattr(value, 'keys'):
fail_validation("Mappings not accepted for {0!r}, got {1!r}",
setting, type(value))
try:
itr = iter(value)
except (TypeError, AttributeError):
fail_validation("Expected sequence for {0!r}, got {1!r}",
setting, type(value))
# Check individual items
for i, item in enumerate(itr):
item_setting = setting + "[{0}]".format(i)
item_validator(item, item_setting)
return validator
[docs]def check_mapping_items(key_validator, value_validator, allow_none=False):
def validator(value, setting='setting'):
if allow_none and value is None:
return
for k, v in value.items():
field = setting + "[{0!r}]".format(k)
key_validator(k, field)
value_validator(v, field)
return validator
[docs]def check_mapping(spec, allow_none=False, allow_extra=False):
def validator(value, setting='setting'):
if allow_none and value is None:
return
# Check we've been given a mapping
try:
value_items = value.items()
except (AttributeError, TypeError):
fail_validation("Expected mapping for {0}, got {1!r}",
setting, type(value))
# Check for missing and extra attributes
provided = set(value)
expected = set(spec)
missing = expected - provided
if missing:
fail_validation("{0!r} missing from {1}, got {2!r}",
sorted(missing), setting, value)
if not allow_extra:
extra = provided - expected
if extra:
fail_validation("{0!r} unexpected in {1}, got {2!r}",
sorted(extra), setting, value)
# Check the validation of the individual items
for key, value in value_items:
if allow_extra and key not in spec:
continue
value_setting = setting + "[{0!r}]".format(key)
checker = spec[key]
if hasattr(checker, "check"):
checker = checker.check()
elif isinstance(checker, list):
checker = check_sequence(checker[0].check())
checker(value, value_setting)
return validator
[docs]def validate_config(config, spec, *args, **kwds):
check_mapping(spec, *args, **kwds)(config, 'config')
[docs]class ValidatedConfig(object):
_ALLOW_NONE = False
_ALLOW_EXTRA = False
_SPEC = {}
_DEFAULTS = {}
def __init__(self, config=None):
self.config = self._init_config(config)
def _init_config(self, config):
complete = copy.deepcopy(self._DEFAULTS)
if config is not None:
config = config.copy()
# Check for subspecs first
for key, spec in self._SPEC.items():
try:
value = config.pop(key)
except KeyError:
continue
if isinstance(spec, ValidatedConfig):
complete[key] = spec(value).config
elif isinstance(spec, list):
spec = spec[0]
complete[key] = [spec(entry).config for entry in value]
else:
complete[key] = value
# Make sure any unexpected values get reported on validation
complete.update(config)
return complete
def __iter__(self):
return self._SPEC.iterkeys()
[docs] def validate(self):
self.check()(self.config, "config")
@classmethod
[docs] def post_validate(cls, value):
pass
@classmethod
[docs] def check(cls):
mapping_validator = check_mapping(cls._SPEC,
cls._ALLOW_NONE,
cls._ALLOW_EXTRA)
def validator(value, setting='setting'):
mapping_validator(value, setting)
cls.post_validate(value)
return validator
@classmethod
[docs] def ensure_validated(cls, config):
"""Returns a mapping that has been validated against the spec"""
checked_config = cls(config)
checked_config.validate()
return checked_config.config
@classmethod
[docs] def from_json(cls, json_config):
"""Read the config from a JSON file and ensure it is valid"""
checked_config = cls(json.loads(json_config))
checked_config.validate()
return checked_config