304 lines
8.7 KiB
Python
304 lines
8.7 KiB
Python
|
"""
|
||
|
Borrowed from https://gist.github.com/schinckel/aeea9c0f807dd009bf47566df7ac5054
|
||
|
|
||
|
This module overrides the Range.__and__ function, so that it returns a boolean value
|
||
|
based on if the two objects overlap.
|
||
|
The rationale behind this is that it mirrors the `range && range` operator in postgres.
|
||
|
There are tests for this, that hit the database with randomly generated ranges and ensure
|
||
|
that the database and this method agree upon the results.
|
||
|
There is also a more complete `isempty()` method, which examines the bounds types and values,
|
||
|
and determines if the object is indeed empty. This is required when python-created range objects
|
||
|
are dealt with, as these are not normalised the same way that postgres does.
|
||
|
"""
|
||
|
import datetime
|
||
|
|
||
|
from psycopg2.extras import Range
|
||
|
|
||
|
OFFSET = {
|
||
|
int: 1,
|
||
|
datetime.date: datetime.timedelta(1),
|
||
|
}
|
||
|
|
||
|
|
||
|
def normalise(instance):
|
||
|
"""
|
||
|
In the case of discrete ranges (integer, date), then we normalise the values
|
||
|
so it is in the form [start,finish), the same way that postgres does.
|
||
|
If the lower value is None, we normalise this to (None,finish)
|
||
|
"""
|
||
|
if instance.isempty:
|
||
|
return instance
|
||
|
|
||
|
lower = instance.lower
|
||
|
upper = instance.upper
|
||
|
bounds = list(instance._bounds)
|
||
|
|
||
|
if lower is not None and lower == upper and instance._bounds != "[]":
|
||
|
return instance.__class__(empty=True)
|
||
|
|
||
|
if lower is None:
|
||
|
bounds[0] = "("
|
||
|
elif bounds[0] == "(" and type(lower) in OFFSET:
|
||
|
lower += OFFSET[type(lower)]
|
||
|
bounds[0] = "["
|
||
|
|
||
|
if upper is None:
|
||
|
bounds[1] = ")"
|
||
|
elif bounds[1] == "]" and type(upper) in OFFSET:
|
||
|
upper += OFFSET[type(upper)]
|
||
|
bounds[1] = ")"
|
||
|
|
||
|
if lower is not None and lower == upper and bounds != ["[", "]"]:
|
||
|
return instance.__class__(empty=True)
|
||
|
|
||
|
return instance.__class__(lower, upper, "".join(bounds))
|
||
|
|
||
|
|
||
|
def __and__(self, other):
|
||
|
if not isinstance(other, self.__class__):
|
||
|
raise TypeError(
|
||
|
"unsupported operand type(s) for &: '{}' and '{}'".format(
|
||
|
self.__class__.__name__, other.__class__.__name__
|
||
|
)
|
||
|
)
|
||
|
|
||
|
self = normalise(self)
|
||
|
other = normalise(other)
|
||
|
|
||
|
# If _either_ object is empty, then it will never overlap with any other one.
|
||
|
if self.isempty or other.isempty:
|
||
|
return False
|
||
|
|
||
|
if other < self:
|
||
|
return other & self
|
||
|
|
||
|
# Because we can't compare None with a datetime.date(), we need to deal
|
||
|
# with the cases where one (or both) of the parts are None first.
|
||
|
if self.lower is None:
|
||
|
if self.upper is None or other.lower is None:
|
||
|
return True
|
||
|
if self.upper_inc and other.lower_inc:
|
||
|
return self.upper >= other.lower
|
||
|
return self.upper > other.lower
|
||
|
|
||
|
if self.upper is None:
|
||
|
if other.upper is None:
|
||
|
return True
|
||
|
if self.lower_inc and other.upper_inc:
|
||
|
return self.lower <= other.upper
|
||
|
return self.lower < other.upper
|
||
|
|
||
|
# Now, all we care about is self.upper_inc and other.lower_inc
|
||
|
if self.upper_inc and other.lower_inc:
|
||
|
return self.upper >= other.lower
|
||
|
else:
|
||
|
return self.upper > other.lower
|
||
|
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
if not isinstance(other, Range):
|
||
|
return False
|
||
|
|
||
|
self = normalise(self)
|
||
|
other = normalise(other)
|
||
|
|
||
|
return (
|
||
|
self._lower == other._lower
|
||
|
and self._upper == other._upper
|
||
|
and self._bounds == other._bounds
|
||
|
)
|
||
|
|
||
|
|
||
|
def range_merge(self, other):
|
||
|
"Union"
|
||
|
self = normalise(self)
|
||
|
other = normalise(other)
|
||
|
bounds = [None, None]
|
||
|
|
||
|
if self.isempty:
|
||
|
return self
|
||
|
|
||
|
if other.isempty:
|
||
|
return other
|
||
|
|
||
|
if self > other:
|
||
|
self, other = other, self
|
||
|
|
||
|
if self.upper is not None and other.lower is not None:
|
||
|
if not self.upper_inc and other.lower <= self.upper:
|
||
|
# They overlap.
|
||
|
pass
|
||
|
else:
|
||
|
raise ValueError("Result of range union would not be contiguous")
|
||
|
|
||
|
if self.lower is None:
|
||
|
lower = None
|
||
|
bounds[0] = "("
|
||
|
elif self.lower_inc != other.lower_inc:
|
||
|
# The bounds differ, so we need to use the complicated logic.
|
||
|
raise NotImplementedError()
|
||
|
else:
|
||
|
# The bounds are the same, so we can just use the lower value.
|
||
|
lower = min(self.lower, other.lower)
|
||
|
bounds[0] = self._bounds[0]
|
||
|
|
||
|
if self.upper is None or other.upper is None:
|
||
|
upper = None
|
||
|
bounds[1] = ")"
|
||
|
elif self.upper_inc != other.upper_inc:
|
||
|
raise NotImplementedError()
|
||
|
else:
|
||
|
upper = max(self.upper, other.upper)
|
||
|
bounds[1] = self._bounds[1]
|
||
|
|
||
|
return normalise(self.__class__(lower, upper, "".join(bounds)))
|
||
|
|
||
|
|
||
|
def range_intersection(self, other):
|
||
|
self = normalise(self)
|
||
|
other = normalise(other)
|
||
|
|
||
|
if not self & other:
|
||
|
return self.__class__(empty=True)
|
||
|
|
||
|
# We need to use custom comparisons because non-number range types will fail to compare.
|
||
|
# Also, min(X, None) means min(X, Infinity), really.
|
||
|
if self.lower is None:
|
||
|
lower = other.lower
|
||
|
elif other.lower is None:
|
||
|
lower = self.lower
|
||
|
else:
|
||
|
lower = max(self.lower, other.lower)
|
||
|
|
||
|
if self.upper is None:
|
||
|
upper = other.upper
|
||
|
elif other.upper is None:
|
||
|
upper = self.upper
|
||
|
else:
|
||
|
upper = min(self.upper, other.upper)
|
||
|
|
||
|
return normalise(self.__class__(lower, upper, "[)"))
|
||
|
|
||
|
|
||
|
def range_contains(self, other):
|
||
|
if self._bounds is None:
|
||
|
return False
|
||
|
|
||
|
if type(self) == type(other):
|
||
|
return self & other and self + other == self
|
||
|
|
||
|
# We have two tests to make in each case - is the value out of the lower bound,
|
||
|
# and is the value out on the upper bound. We can make a series of tests, and if we ever find
|
||
|
# a situation where we _are_ out of bounds, return at that point.
|
||
|
|
||
|
if self.lower is not None:
|
||
|
if self.lower_inc:
|
||
|
if other < self.lower:
|
||
|
return False
|
||
|
elif other <= self.lower:
|
||
|
return False
|
||
|
|
||
|
if self.upper is not None:
|
||
|
if self.upper_inc:
|
||
|
if other > self.upper:
|
||
|
return False
|
||
|
elif other >= self.upper:
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
def deconstruct(self):
|
||
|
return (
|
||
|
"{}.{}".format(self.__class__.__module__, self.__class__.__name__),
|
||
|
[self.lower, self.upper, self._bounds],
|
||
|
{},
|
||
|
)
|
||
|
|
||
|
|
||
|
Range.__add__ = range_merge
|
||
|
Range.__and__ = __and__
|
||
|
Range.__eq__ = __eq__
|
||
|
Range.__mul__ = range_intersection
|
||
|
Range.__contains__ = range_contains
|
||
|
|
||
|
Range.deconstruct = deconstruct
|
||
|
|
||
|
|
||
|
_BOUNDS_SWAP = {"[": ")", "]": "(", "(": "]", ")": "[",}.get
|
||
|
|
||
|
|
||
|
def safe_subtract(initial, subtract):
|
||
|
"""
|
||
|
Subtract the range "subtract" from the range "initial".
|
||
|
Always return an array of ranges (which may be empty).
|
||
|
"""
|
||
|
_Range = initial.__class__
|
||
|
sub_bounds = "".join(map(_BOUNDS_SWAP, subtract._bounds))
|
||
|
|
||
|
# Simplest case - ranges are the same, or the source one is fully contained within
|
||
|
# the subtracting one, then we get an empty list of ranges.
|
||
|
if subtract == initial or initial in subtract:
|
||
|
return []
|
||
|
|
||
|
# If the ranges don't overlap, then we retain the source.
|
||
|
if not initial & subtract:
|
||
|
return [initial]
|
||
|
|
||
|
# We will have either one or two objects, depending upon if the subtractor overlaps one of the bounds or not.
|
||
|
# We know that both of them will not overlap the bounds, because that case has already been dealt with.
|
||
|
|
||
|
if initial.upper in subtract or (
|
||
|
not initial.upper_inc and initial.upper == subtract.upper
|
||
|
):
|
||
|
return [
|
||
|
_Range(
|
||
|
initial.lower,
|
||
|
subtract.lower,
|
||
|
"{}{}".format(initial._bounds[0], sub_bounds[0],),
|
||
|
)
|
||
|
]
|
||
|
elif initial.lower in subtract or (
|
||
|
not initial.lower_inc and initial.lower == subtract.lower
|
||
|
):
|
||
|
return [
|
||
|
_Range(
|
||
|
subtract.upper,
|
||
|
initial.upper,
|
||
|
"{}{}".format(sub_bounds[1], initial._bounds[1]),
|
||
|
)
|
||
|
]
|
||
|
else:
|
||
|
return [
|
||
|
_Range(
|
||
|
initial.lower,
|
||
|
subtract.lower,
|
||
|
"{}{}".format(initial._bounds[0], sub_bounds[0]),
|
||
|
),
|
||
|
_Range(
|
||
|
subtract.upper,
|
||
|
initial.upper,
|
||
|
"{}{}".format(sub_bounds[1], initial._bounds[1]),
|
||
|
),
|
||
|
]
|
||
|
|
||
|
|
||
|
def array_subtract(initial, subtract):
|
||
|
"""
|
||
|
subtract the range from each item in the initial array.
|
||
|
"""
|
||
|
result = []
|
||
|
for _range in initial:
|
||
|
result.extend(safe_subtract(_range, subtract))
|
||
|
return result
|
||
|
|
||
|
|
||
|
def array_subtract_all(initial, subtract):
|
||
|
"""
|
||
|
Subtract all overlapping ranges in subtract from all ranges in initial.
|
||
|
"""
|
||
|
result = list(initial)
|
||
|
for other in subtract:
|
||
|
result = array_subtract(result, other)
|
||
|
return result
|