"""Datastore model definitions and utility functions.""" import bisect import collections import itertools import logging import re import StringIO import sys import time from google.appengine.api import memcache from google.appengine.ext import db class Error(Exception): pass class InvalidExpression(Error): pass class InvalidSerializedData(Error): pass class Tag(db.Model): """A single unique key/value tag pair. Immutable once written, so positively cacheable. """ # key_name = string representation of tag key and value tag_key = db.StringProperty(required=True) tag_value = db.StringProperty(required=True) _cache = {} @classmethod def Get(cls, key, value): """Fetch a single Tag. May fetch from cache or datastore. Positive but not negative caching. Args: key: string value: string Returns: Tag object, or None. """ cache_key = '%s=%s' % (key, value) obj = cls._cache.get(cache_key, None) if obj: return obj obj = cls.get_by_key_name(cache_key) if obj: cls._cache[cache_key] = obj return obj return None @classmethod def GetOrCreate(cls, key, value): """Fetch or create a single Tag. May fetch from cache or datastore. Positive but not negative caching. Args: key: string value: string Returns: Tag object. """ cache_key = '%s=%s' % (key, value) obj = cls._cache.get(cache_key, None) if obj: return obj obj = cls.get_or_insert(cache_key, tag_key=key, tag_value=value) cls._cache[cache_key] = obj return obj @classmethod def FromStr(cls, tagstring, create=False): """Fetch a Tag using a key=value string representation. Args: tagstring: = create: if True, create the Tag if it doesn't exist. Only use from POST requests. Returns: Tag object, or None (if create=False). """ key, value = tagstring.split('=', 1) if create: return cls.GetOrCreate(key, value) else: return cls.Get(key, value) @classmethod def _CacheAndYield(cls, query): """Fetch and yield tags from a query, caching the results sa we go. Args: query: already-running datastore query that returns Tag objects Yields: Tag objects """ for tag in query: cls._cache[tag.key().name()] = tag yield tag @classmethod def FromKey(cls, keystring): """Fetch all tags with a given key. Args: keystring: string Returns: Iterator of Tag objects. """ query = (cls.all() .filter('tag_key =', keystring) .run()) return cls._CacheAndYield(query) def __cmp__(self, other): return cmp((self.tag_key, self.tag_value), (other.tag_key, other.tag_value)) def __str__(self): return '%s=%s' % (self.tag_key, self.tag_value) def __repr__(self): return str(self) class TimeSeries(db.Model): """A unique set of tags. Immutable once written, so positively cacheable. """ _EXPR_RE = re.compile('^\\{(?P.*)\\}(\\[(?P.*)\\])?$') # key_name = string representation of sorted tags tags = db.ListProperty(item_type=db.Key, required=True) _cache = {} @classmethod def GetOrCreate(cls, tags): """Fetch a single TimeSeries. May fetch from cache or datastore. Positive but not negative caching. Args: tags: list of Tag objects Returns: TimeSeries object. """ tags.sort(key=lambda x: (x.tag_key, x.tag_value)) cache_key = ','.join(str(x) for x in tags) obj = cls._cache.get(cache_key, None) if obj: return obj obj = cls.get_or_insert(cache_key, tags=[x.key() for x in tags]) cls._cache[cache_key] = obj return obj @classmethod def FromExpr(cls, expr): """Fetch TimeSeries using a string expression. Expression format is: {=,=,...}[,,...] Args: expr: string, see format above Returns: dict of (grouping key,grouping key,...): list of TimeSeries objects Empty tuple key if there are no grouping keys Raises: InvalidExpression: expr not parseable """ parsed = cls._EXPR_RE.match(expr) if not parsed: raise InvalidExpression tags = [] for key_value in parsed.group('selectors').split(','): tags.append(Tag.FromStr(key_value)) groupings = [] groupings_str = parsed.group('groupings') if groupings_str: for grouping in groupings_str.split(','): groupings.append(Tag.FromKey(grouping)) return cls.GetPartial(tags, groupings) @classmethod def GetPartial(cls, tags, groupings): """Get a list of TimeSeries matches for the given tags. Does partial matching, e.g. key1=value1 matches the TimeSeries key1=value1,key2=value2. This enables forwards-compatibility, i.e. it allows data sources to add new tags without breaking existing readers. Args: tags: list of Tag objects, for selection groupings: list of list of Tag objects, for grouping Returns: dict of (grouping key,grouping key,...): list of TimeSeries objects Empty tuple key if there are no grouping keys """ # Build all our query objects, then go through and use them, trying to # keep a minimum set in memory at all times. # Build and start queries for selectors. tag_queries = [] for tag in tags: tag_queries.append(cls.all(keys_only=True) .filter('tags =', tag) .run()) # Build and start queries for grouping. Also build a dict of sets used later # to structure the response. tags_by_tag_keys = collections.defaultdict(set) grouping_queries = [] for grouping in groupings: or_queries = [] for tag in grouping: tags_by_tag_keys[tag.tag_key].add(tag.key()) or_queries.append(cls.all(keys_only=True) .filter('tags =', tag) .run()) grouping_queries.append(or_queries) # Pull the results from the selection queries and combine them (AND). keys = None for query in tag_queries: timeseries = set(query) if keys is None: keys = timeseries else: keys &= timeseries # Pull the results from the grouping queries and combine them (OR within # the tags for a group, AND between groups). We eliminate any TimeSeries # that lack any values for the grouping tags. for or_queries in grouping_queries: or_set = set() for query in or_queries: timeseries = set(query) # Pre-filter to trim down things to keep. timeseries &= keys or_set |= timeseries keys &= or_set # Fetch all the relevant TimeSeries objects, either from cache or in from # datastore in batch. Write anything fetched from datastore back to cache. timeseries = [] to_fetch = [] for key in keys: obj = cls._cache.get(key.name(), None) if obj: timeseries.append(obj) else: to_fetch.append(key) if to_fetch: for obj in cls.get(to_fetch): cls._cache[obj.key().name()] = obj timeseries.append(obj) # Shortcut for queries without grouping. if not groupings: return { (): timeseries, } # Bucket the TimeSeries objects by their grouping values. ret = collections.defaultdict(list) for ts in timeseries: ts_tags = set(ts.tags) intersects = [ ts_tags & tags for tags in tags_by_tag_keys.itervalues() ] # We handle cases where a given TimeSeries has multiple values for the # same tag key. Those go into multiple buckets, hence product(). for keysets in itertools.product(*intersects): tags = tuple(Tag.FromStr(x.name()) for x in keysets) ret[tags].append(ts) return ret @classmethod def KeyName(cls, obj): if isinstance(obj, db.Model): return obj.key().name() elif isinstance(obj, db.Key): return obj.name() else: assert False, obj def AddValue(self, value, timestamp=None, offset=False): """Add a value to this TimeSeries. Finds or creates the appropriate Values child object and adds the new value to it. Args: value: integer timestamp: UNIX timestamp; defaults to now offset: if True, values are offsets from previous value """ timestamp = timestamp or int(time.time()) Values.AddValue(self, timestamp, value, offset=offset) def AddValues(self, timestamp_value_pairs, offset=False): """Add values to this TimeSeries. Args: timestamp_value_pairs: list of (unix_timestamp, value) tuples offset: if True, values are offsets from previous value """ if not timestamp_value_pairs: return Values.AddValues(self, timestamp_value_pairs, offset=offset) def _FilterValues(self, values_list, start, end): """Filter values based on criteria. Separated to move the yield call out of GetValues(). See GetValues() for args. """ for values in values_list: for timestamp, value in values.GetValues(): if timestamp < start or timestamp > end: continue yield (timestamp, value) def GetValues(self, start=None, end=None, resolution=None): """Fetch values from this TimeSeries. Args: start: UNIX timestamp; defaults to 0 end: UNIX timestamp; defaults to sys.maxint resolution: one of Values.RESOLUTIONS Yields: (unix_timestamp, value) pairs, in ascending chronological order """ start = start or 0 end = end or sys.maxint resolution = resolution or Values.FULL memiter = [] if resolution in Values.LEADING_BLOCK_IN_MEMCACHE: client = memcache.Client() namespace = 'TimeSeries:%d' % resolution values = client.get(self.KeyName(self), namespace=namespace) if values: if values.start_time <= start: return self._FilterValues([values], start, end) memiter = [values] query1 = (Values.all() .ancestor(self) .filter('resolution =', resolution) .filter('start_time <=', start) .order('-start_time')) values1 = query1.run(limit=1) query2 = (Values.all() .ancestor(self) .filter('resolution =', resolution) .filter('start_time >', start) .filter('start_time <=', end) .order('start_time')) values2 = query2.run() return self._FilterValues(itertools.chain(values1, values2, memiter), start, end) def __str__(self): return self.key().name() class Values(db.Model): """A chunk of values for a timeseries. Subject to read/modify/write which must be transactional. """ # parent = TimeSeries FULL = 0 MINUTE = 1 HOUR = 2 DAY = 3 SECONDS = { MINUTE: 60, HOUR: 60 * 60, DAY: 60 * 60 * 24, } RESOLUTION_NAMES = { 'full': FULL, 'minute': MINUTE, 'hour': HOUR, 'day': DAY, } RESOLUTIONS = {FULL, MINUTE, HOUR, DAY} DOWNSAMPLES = {MINUTE, HOUR, DAY} # Dict of resolution -> (max seconds in memcache, min seconds in memcache) LEADING_BLOCK_IN_MEMCACHE = { FULL: (60 * 60 * 24, 20 * 60), MINUTE: (60 * 60 * 24, 2 * 60 * 60), } resolution = db.IntegerProperty(required=True, choices=RESOLUTIONS) start_time = db.IntegerProperty(required=True) start_value = db.IntegerProperty(required=True) end_time = db.IntegerProperty(required=True) end_value = db.IntegerProperty(required=True) # Encoding: pairs of zigzag base128 varint encodings of deltas from the # previous time (in seconds) and value (units unspecified) times_and_values = db.BlobProperty() _BLOB_LIMIT = 2**16 @classmethod def AddValue(cls, timeseries, timestamp, value, offset=False, resolution=FULL): """Single value wrapper for AddValues()""" return cls.AddValues(timeseries, [[timestamp, value]], offset, resolution) @classmethod def AddValues(cls, timeseries, timestamp_value_pairs, offset=False, resolution=FULL): """Find or create a block, then add values. Args: timeseries: parent TimeSeries object timestamp_value_pairs: list of (unix_timestamp, value) tuples offset: if True, values are offsets from previous value resolution: one of Values.RESOLUTIONS """ if not timestamp_value_pairs: return if resolution in cls.LEADING_BLOCK_IN_MEMCACHE: cls._AddValuesMemcache(timeseries, timestamp_value_pairs, offset, resolution) else: cls._AddValuesDatastore(timeseries, timestamp_value_pairs, offset, resolution) @classmethod def _AddValuesMemcache(cls, timeseries, timestamp_value_pairs, offset, resolution): """AddValues helper for memcache-backed blocks. See AddValues() for arguments. """ client = memcache.Client() key = TimeSeries.KeyName(timeseries) namespace = 'TimeSeries:%d' % resolution values = client.gets(key, namespace=namespace) if values: values._AddValues(timestamp_value_pairs, offset=offset) assert client.cas(key, values, namespace=namespace) else: values = cls._Create(timeseries, timestamp_value_pairs, resolution) client.set(key, values, namespace=namespace) max_age, min_age = cls.LEADING_BLOCK_IN_MEMCACHE[resolution] now = int(time.time()) age = now - values.start_time if age > max_age: new_start = now - min_age logging.info('Memcache block is too old (%d > %d); splitting at %d', age, max_age, new_start) values._Split(split_timestamp=new_start) client.set(key, values, namespace=namespace) @classmethod @db.transactional() def _AddValuesDatastore(cls, timeseries, timestamp_value_pairs, offset, resolution, add_downsamples=True): """AddValues helper for datastore-backed blocks. See AddValues() for arguments. """ values = (cls.all() .ancestor(timeseries) .filter('resolution =', resolution) .order('-start_time')).fetch(1) if values: values = values[0] values._AddValues(timestamp_value_pairs, offset=offset, add_downsamples=add_downsamples) else: values = cls._Create(timeseries, timestamp_value_pairs, resolution, add_downsamples=add_downsamples) values.save() @classmethod def _Create(cls, timeseries, timestamp_value_pairs, resolution, add_downsamples=True): """Factory for new Values object.""" first_timestamp, first_value = timestamp_value_pairs[0] values = Values( parent=timeseries, resolution=resolution, start_time=first_timestamp, start_value=first_value, end_time=first_timestamp, end_value=first_value) if add_downsamples and resolution == cls.FULL: # Add to downsamples just in case. for downsample in cls.DOWNSAMPLES: cls.AddValues(timeseries, [[first_timestamp, first_value]], resolution=downsample) remaining = timestamp_value_pairs[1:] values._AddValues(remaining, add_downsamples=add_downsamples) return values @classmethod def ToZigZag(cls, value): """Converts a value to zig zag encoding. Args: value: positive or negative integer Returns: Positive integer encoding """ if value < 0: return (abs(value) << 1) - 1 else: return value << 1 @classmethod def FromZigZag(cls, value): """Converts a value from zig zag encoding. Args: value: positive encoded integer Returns: Positive or negative decoded integer """ if value & 0x01: return 0 - ((value + 1) >> 1) else: return value >> 1 @classmethod def ToVarint(cls, value): """Encodes an integer as a zig zag, variable-length string. Args: value: positive or negative integer Returns: A string with one or more encoded bytes """ value = cls.ToZigZag(value) ret = [] while value >= 2**7: ret.append(chr(0x80 | (value % 2**7))) value >>= 7 ret.append(chr(value)) return ''.join(ret) @classmethod def FromVarint(cls, fh): """Decodes a variable-length, zig zag integer from a file handle. Moves the file pointer forward to the byte after the encoded integer. Args: fh: file handle to read from; use StringIO to use a string Returns: Positive or negative decoded integer, or None if the stream stops immediately Raises: InvalidSerializedData: stream stops in the middle of a varint """ val = 0 shift = 0 while True: byte = fh.read(1) if not byte: if val == 0: return None else: raise InvalidSerializedData('incomplete varint') byte_val = ord(byte) val |= ((byte_val & 0x7f) << shift) if not byte_val & 0x80: break shift += 7 return cls.FromZigZag(val) def _AddValues(self, timestamp_value_pairs, offset=False, add_downsamples=True): """Add values to this block. Args: timestamp_value_pairs: list of (unix_timestamp, value) tuples offset: if True, values are offsets from previous value add_downsamples: if True, add downsample values if necessary """ if not timestamp_value_pairs: return timestamp_value_pairs.sort() if offset: # Convert to absolute values prev_value = self.end_value for pair in timestamp_value_pairs: pair[1] += prev_value prev_value = pair[1] if add_downsamples and self.resolution == self.FULL: self._CheckAddDownsamples(timestamp_value_pairs) parts = [ self.times_and_values or '', ] for timestamp, value in timestamp_value_pairs: if timestamp < self.start_time: logging.warn('Skipping old value (%s): %d < %d', self.key().name(), timestamp, self.start_time) parts.append(self.ToVarint(timestamp - self.end_time)) parts.append(self.ToVarint(value - self.end_value)) self.end_time = timestamp self.end_value = value self.times_and_values = ''.join(parts) if len(self.times_and_values) > self._BLOB_LIMIT: self._Split() def _CheckAddDownsamples(self, timestamp_value_pairs): """Add downsamples if necessary. Args: timestamp_value_pairs: sorted list of (unix_timestamp, value) tuples """ # We add a skew factor so we don't have every timeseries trying to add # downsamples at the same time. skew = hash(self.parent_key().name()) assert self.resolution == self.FULL for resolution, seconds in self.SECONDS.iteritems(): prev_timestamp = self.end_time for timestamp, value in timestamp_value_pairs: if (timestamp + skew) / seconds != (prev_timestamp + skew) / seconds: # We've crossed a downsample boundary Values.AddValue(self.parent_key(), timestamp, value, resolution=resolution) prev_timestamp = timestamp def GetValues(self): """Fetch all values from this block. Returns: Sorted list of (unix_timestamp, value) tuples. Raises: InvalidSerializedData: serialized values stops in the middle of a timestamp and value pair """ values = [(self.start_time, self.start_value)] if not self.times_and_values: return values last_timestamp = self.start_time last_value = self.start_value fh = StringIO.StringIO(self.times_and_values) while True: timestamp = self.FromVarint(fh) if timestamp is None: break value = self.FromVarint(fh) if value is None: raise InvalidSerializedData('time without value') last_timestamp += timestamp last_value += value values.append((last_timestamp, last_value)) values.sort(key=lambda x: x[0]) if self.resolution != self.FULL: seconds = self.SECONDS[self.resolution] # Clean up the values list for potential duplicates i = 1 while i < len(values): if values[i][0] / seconds == values[i - 1][0] / seconds: # Latter value is a dupe del values[i] else: i += 1 return values def _Split(self, split_timestamp=None): """Split this block into two. Decodes the current block and puts the latter half of the values into a new block. Args: split_timestamp: UNIX timestamp around which to split (inclusive in the latter block) """ values = self.GetValues() if split_timestamp is None: split_point = len(values) / 2 else: split_point = bisect.bisect_left(values, (split_timestamp, 0)) logging.info( 'Splitting block from TimeSeries=%s, resolution=%d, ' 'split_timestamp=%s, split_point=%d', TimeSeries.KeyName(self.parent_key()), self.resolution, split_timestamp, split_point) old_values = values[:split_point] if split_timestamp is None: old_obj = self._Create(self.parent_key(), old_values, resolution=self.resolution, add_downsamples=False) old_obj.save() else: # split_timestamp implies that we're doing an uneven block split. To # avoid unnecessary fragmentation, add to the leading block in # datastore, rather than creating a new one. self._AddValuesDatastore(self.parent_key(), old_values, offset=False, resolution=self.resolution, add_downsamples=False) self.times_and_values = None self.start_time = values[split_point][0] self.start_value = values[split_point][1] self.end_time = self.start_time self.end_value = self.start_value self._AddValues(values[split_point + 1:], add_downsamples=False)