diff --git a/api.py b/api.py new file mode 100644 index 0000000..f2efdbb --- /dev/null +++ b/api.py @@ -0,0 +1,163 @@ +"""HTTP API implementation.""" + +import csv +import json +import logging +import time +import webapp2 + +import models + + +def Multiplex(iterators, selection_func): + """Iterate over multiple sources and multiplex values. + + Args: + iterators: list of source iterators + selection_func: takes a list of values, selects one and returns its index + Yields: + Each value from the source iterators, in the order determined by + selection_func. + """ + iterators = list(iterators) + + for i, obj in reversed(list(enumerate(iterators))): + try: + iterators[i] = [obj, obj.next()] + except StopIteration: + del iterators[i] + + while iterators: + i = selection_func([x[1] for x in iterators]) + yield iterators[i][1] + try: + iterators[i][1] = iterators[i][0].next() + except StopIteration: + del iterators[i] + + +class Get(webapp2.RequestHandler): + """Fetch values from one or more timeseries. + + HTTP parameters: + expr= + format={csv,json} + resolution={full,minute,hour,day} + start= + end= + """ + + def get(self): + output_format = self.request.get('format', 'json') + assert output_format in ('csv', 'json') + + resolution = self.request.get('resolution', 'full') + resolution = models.Values.RESOLUTION_NAMES[resolution] + + start = self.request.get('start', None) + if start: + start = int(start) + if start < 0: + start = int(time.time()) + start + end = self.request.get('end', None) + if end: + end = int(end) + if end < 0: + end = int(time.time()) + end + + expr = self.request.get('expr') + data = models.TimeSeries.FromExpr(expr) + + if output_format == 'csv': + self.response.content_type = 'text/csv' + fh = csv.writer(self.response.out) + # CSV requires us to pre-determine column names + try: + group = data.iterkeys().next() + except StopIteration: + return + group_columns = [x.tag_key for x in group] + fh.writerow(['timestamp'] + group_columns + ['value']) + elif output_format == 'json': + self.response.content_type = 'application/json' + ret = [] + + def SelectMinTime(values): + return min(range(len(values)), key=lambda x: values[x][0]) + + streams = {} + + # Get all datastore queries running in parallel first + for groupings, timeseries in data.iteritems(): + streams[groupings] = [ + x.GetValues(start=start, end=end, resolution=resolution) + for x in timeseries + ] + + for groupings in sorted(data.keys()): + group_values = dict((x.tag_key, x.tag_value) for x in sorted(groupings)) + + values = Multiplex(streams[groupings], SelectMinTime) + + if output_format == 'csv': + group_constants = [group_values[x] for x in group_columns] + for timestamp, value in values: + fh.writerow([timestamp] + group_constants + [value]) + elif output_format == 'json': + ret.append({ + 'tags': group_values, + 'timestamps_values': list(values), + }) + + if output_format == 'json': + json.dump(ret, self.response.out, separators=(',', ':')) + + +class Put(webapp2.RequestHandler): + """Add a value to a timeseries. + + HTTP parameters: + tag== (repeated) + value= + """ + + def _HandleBlock(self, block): + if 'timestamps_values' not in block: + now = int(time.time()) + block['timestamps_values'] = [[now, x] for x in block['values']] + + if 'client_timestamp' in block: + server_timestamp = int(time.time()) + offset = server_timestamp - block['client_timestamp'] + for pair in block['timestamps_values']: + pair[0] += offset + + tags = [models.Tag.FromStr(x, create=True) + for x in block['tags']] + timeseries = models.TimeSeries.GetOrCreate(tags) + + timeseries.AddValues(block['timestamps_values'], + offset=block.get('offset', False)) + + def post(self): + content_type = self.request.headers['Content-Type'].split(';', 1)[0] + + if content_type == 'application/x-www-form-urlencoded': + self._HandleBlock({ + 'offset': bool(self.request.get('offset', 0)), + 'tags': self.request.get_all('tag'), + 'values': [int(x) for x in self.request.get_all('value')], + }) + + elif content_type == 'application/json': + for block in json.loads(self.request.body): + self._HandleBlock(block) + + else: + assert False, content_type + + +app = webapp2.WSGIApplication([ + ('/api/get', Get), + ('/api/put', Put), +]) diff --git a/app.yaml b/app.yaml new file mode 100644 index 0000000..76d8669 --- /dev/null +++ b/app.yaml @@ -0,0 +1,20 @@ +application: yatsdb +version: 1 +runtime: python27 +api_version: 1 +threadsafe: true + +handlers: +- url: /api/.* + script: api.app + secure: always + +- url: /console + static_files: console/console.html + upload: console/console.html + +- url: /console + static_dir: console + +builtins: +- remote_api: on diff --git a/console/console.css b/console/console.css new file mode 100644 index 0000000..6aa168d --- /dev/null +++ b/console/console.css @@ -0,0 +1,41 @@ +@import url(https://fonts.googleapis.com/css?family=Roboto+Mono); + +body { + margin: 0; +} + +.expr { + position: fixed; + top: 0; + left: 0; + right: 0; + z-index: 1; +} + +.expr form { + margin: 0; +} + +.expr input { + width: 100%; + border: 0; + padding: 5px; + font-family: 'Roboto Mono'; + font-size: 15px; + border-bottom: 1px solid #757575; + background: rgba(255,255,255,0.7); +} + +.expr input:focus { + outline: none; + border-bottom: 1px solid #5264AE; +} + +.chart { + width: 900px; + height: 500px; + display: inline-block; + -moz-user-select: none; + -webkit-user-select: none; + -ms-user-select: none; +} diff --git a/console/console.html b/console/console.html new file mode 100644 index 0000000..de1207b --- /dev/null +++ b/console/console.html @@ -0,0 +1,12 @@ + + + YATSDB Console + + + + + + +
+ + diff --git a/console/console.js b/console/console.js new file mode 100644 index 0000000..4ea249e --- /dev/null +++ b/console/console.js @@ -0,0 +1,175 @@ +var Console = function(container) { + this.container_ = container; + this.client_ = new TSDBClient(); + this.loadJSAPI_(); +}; + + +Console.prototype.loadJSAPI_ = function() { + google.load('visualization', '1.1', { + packages: ['corechart'], + callback: this.jsapiLoaded_.bind(this), + }); +}; + + +Console.prototype.jsapiLoaded_ = function() { + var exprContainer = document.createElement('div'); + exprContainer.className = 'expr'; + this.container_.appendChild(exprContainer); + + var form = document.createElement('form'); + exprContainer.appendChild(form); + + this.exprInput_ = document.createElement('input'); + form.appendChild(this.exprInput_); + form.addEventListener('submit', this.onExprChange_.bind(this)); + + var chartContainer = document.createElement('div'); + this.container_.appendChild(chartContainer); + + this.charts_ = [ + { + 'title': 'Last 20 minutes', + 'seconds': 120 * 10, + 'resolution': 'full', + }, + { + 'title': 'Last 2 hours', + 'seconds': 120 * 60, + 'resolution': 'minute', + }, + { + 'title': 'Last 5 days', + 'seconds': 120 * 60 * 60, + 'resolution': 'hour', + }, + { + 'title': 'Last 120 days', + 'seconds': 120 * 60 * 60 * 24, + 'resolution': 'day', + }, + ]; + for (var i = 0; i < this.charts_.length; i++) { + var obj = this.charts_[i]; + obj.container = document.createElement('div'); + obj.container.className = 'chart'; + chartContainer.appendChild(obj.container); + } + + this.expr_ = decodeURIComponent(window.location.hash.substring(1)); + // Ensure that the URL is properly encoded for copy & paste. + window.location.hash = encodeURIComponent(this.expr_); + this.exprInput_.value = this.expr_; + window.addEventListener('hashchange', this.onHashChange_.bind(this)); + + this.loadCharts_(); +}; + + +Console.prototype.onExprChange_ = function(e) { + window.location.hash = encodeURIComponent(this.exprInput_.value); + e.preventDefault(); +}; + + +Console.prototype.onHashChange_ = function(e) { + this.expr_ = decodeURIComponent(window.location.hash.substring(1)); + this.exprInput_.value = this.expr_; + this.loadCharts_(); +}; + + +Console.prototype.loadCharts_ = function() { + if (!this.expr_) { + return; + } + + for (var i = 0; i < this.charts_.length; i++) { + var options = this.charts_[i]; + if (options.instance) { + options.instance.destroy(); + } + options.instance = new Chart(this.client_, this.expr_, options); + } +}; + + +var Chart = function(client, expr, options) { + this.options_ = options; + this.watch_ = client.watch( + expr, this.options_.seconds, this.options_.resolution, + this.drawChart_.bind(this)); +}; + + +Chart.prototype.drawChart_ = function(data) { + var dataTable = new google.visualization.DataTable(); + dataTable.addColumn('date', 'Timestamp'); + + for (var i = data.length - 1; i >= 0; i--) { + var ts = data[i]; + if (!ts.timestamps_values.length) { + data.splice(i, 1); + continue; + } + var tags = []; + for (var key in ts.tags) { + tags.push(key + '=' + ts.tags[key]); + } + dataTable.addColumn('number', tags.join(',')); + } + for (var i = 0; i < data.length; i++) { + var ts = data[i]; + for (var j = 0; j < ts.timestamps_values.length; j++) { + var row = [new Date(ts.timestamps_values[j][0] * 1000)]; + for (var k = 0; k < i; k++) { + row.push(null); + } + row.push(ts.timestamps_values[j][1]); + for (var k = i + 1; k < data.length; k++) { + row.push(null); + } + dataTable.addRow(row); + } + }; + + var options = { + title: this.options_.title, + legend: { + position: 'bottom' + }, + hAxis: { + gridlines: { + count: -1, + }, + }, + explorer: { + actions: [ + 'dragToZoom', + 'rightClickToReset', + ], + }, + }; + + // It'd be great to use the material design charts here, but they fail to load + // ~25% of the time, which is a non-starter. + if (this.chartObj_) { + this.chartObj_.clearChart(); + } + this.chartObj_ = new google.visualization.LineChart(this.options_.container); + this.chartObj_.draw(dataTable, options); +}; + + +Chart.prototype.destroy = function() { + this.watch_.destroy(); + if (this.chartObj_) { + this.chartObj_.clearChart(); + } +}; + + +document.addEventListener('DOMContentLoaded', function(e) { + new Console(document.getElementById('container')); +}); diff --git a/console/lib.js b/console/lib.js new file mode 100644 index 0000000..14fd7b2 --- /dev/null +++ b/console/lib.js @@ -0,0 +1,78 @@ +var TSDBClient = function(opt_baseURL) { + this.baseURL_ = opt_baseURL || '/api'; +}; + + +TSDBClient.prototype.watch = function( + expr, windowSeconds, resolution, callback) { + return new TSDBWatch( + this.baseURL_, expr, windowSeconds, resolution, callback); +}; + + +var TSDBWatch = function(baseURL, expr, windowSeconds, resolution, callback) { + this.url_ = baseURL + '/get?start=-' + windowSeconds.toString() + + '&resolution=' + encodeURIComponent(resolution) + + '&expr=' + encodeURIComponent(expr); + this.callback_ = callback; + this.refresh_ = windowSeconds / 40; + this.delay_ = TSDBWatch.MIN_DELAY; + this.sendRequest_(); +}; + + +TSDBWatch.MIN_DELAY = 0.5; +TSDBWatch.MAX_DELAY = 32.0; +TSDBWatch.DELAY_MULT = 2; + + +TSDBWatch.prototype.destroy = function() { + if (this.timer_) { + window.clearTimeout(this.timer_); + } +}; + + +TSDBWatch.prototype.sendRequest_ = function() { + this.timer_ = null; + + var xhr = new XMLHttpRequest(); + xhr.responseType = 'json'; + xhr.timeout = 30 * 1000; + xhr.open('GET', this.url_); + xhr.addEventListener('load', this.onLoad_.bind(this)); + xhr.addEventListener('error', this.retry_.bind(this)); + xhr.addEventListener('timeout', this.retry_.bind(this)); + xhr.send(); +}; + + +TSDBWatch.prototype.retry_ = function(e) { + if (this.timer_) { + console.log('Duplicate retry call'); + return; + } + this.timer_ = window.setTimeout( + this.sendRequest_.bind(this), + this.delay_ * 1000); + this.delay_ = Math.min(this.delay_ * TSDBWatch.DELAY_MULT, + TSDBWatch.MAX_DELAY); +}; + + +TSDBWatch.prototype.onLoad_ = function(e) { + var data = e.target.response; + + if (!data) { + this.retry_(); + return; + } + + this.delay_ = TSDBWatch.MIN_DELAY; + + this.callback_(data); + + this.timer_ = window.setTimeout( + this.sendRequest_.bind(this), + this.refresh_ * 1000); +} diff --git a/models.py b/models.py new file mode 100644 index 0000000..c70eac0 --- /dev/null +++ b/models.py @@ -0,0 +1,761 @@ +"""Datastore model definitions and utility functions.""" + +import bisect +import collections +import itertools +import logging +import re +import StringIO +import sys +import time + +from google.appengine.api import memcache +from google.appengine.ext import db + + +class Error(Exception): + pass + + +class InvalidExpression(Error): + pass + + +class InvalidSerializedData(Error): + pass + + +class Tag(db.Model): + """A single unique key/value tag pair. + + Immutable once written, so positively cacheable. + """ + # key_name = string representation of tag key and value + + tag_key = db.StringProperty(required=True) + tag_value = db.StringProperty(required=True) + + _cache = {} + + @classmethod + def Get(cls, key, value): + """Fetch a single Tag. + + May fetch from cache or datastore. Positive but not negative caching. + + Args: + key: string + value: string + Returns: + Tag object, or None. + """ + cache_key = '%s=%s' % (key, value) + obj = cls._cache.get(cache_key, None) + if obj: + return obj + obj = cls.get_by_key_name(cache_key) + if obj: + cls._cache[cache_key] = obj + return obj + return None + + @classmethod + def GetOrCreate(cls, key, value): + """Fetch or create a single Tag. + + May fetch from cache or datastore. Positive but not negative caching. + + Args: + key: string + value: string + Returns: + Tag object. + """ + cache_key = '%s=%s' % (key, value) + obj = cls._cache.get(cache_key, None) + if obj: + return obj + obj = cls.get_or_insert(cache_key, tag_key=key, tag_value=value) + cls._cache[cache_key] = obj + return obj + + @classmethod + def FromStr(cls, tagstring, create=False): + """Fetch a Tag using a key=value string representation. + + Args: + tagstring: = + create: if True, create the Tag if it doesn't exist. Only use from POST + requests. + Returns: + Tag object, or None (if create=False). + """ + key, value = tagstring.split('=', 1) + if create: + return cls.GetOrCreate(key, value) + else: + return cls.Get(key, value) + + @classmethod + def _CacheAndYield(cls, query): + """Fetch and yield tags from a query, caching the results sa we go. + + Args: + query: already-running datastore query that returns Tag objects + Yields: + Tag objects + """ + for tag in query: + cls._cache[tag.key().name()] = tag + yield tag + + @classmethod + def FromKey(cls, keystring): + """Fetch all tags with a given key. + + Args: + keystring: string + Returns: + Iterator of Tag objects. + """ + query = (cls.all() + .filter('tag_key =', keystring) + .run()) + return cls._CacheAndYield(query) + + def __cmp__(self, other): + return cmp((self.tag_key, self.tag_value), + (other.tag_key, other.tag_value)) + + def __str__(self): + return '%s=%s' % (self.tag_key, self.tag_value) + + def __repr__(self): + return str(self) + + +class TimeSeries(db.Model): + """A unique set of tags. + + Immutable once written, so positively cacheable. + """ + _EXPR_RE = re.compile('^\\{(?P.*)\\}(\\[(?P.*)\\])?$') + + # key_name = string representation of sorted tags + tags = db.ListProperty(item_type=db.Key, required=True) + + _cache = {} + + @classmethod + def GetOrCreate(cls, tags): + """Fetch a single TimeSeries. + + May fetch from cache or datastore. Positive but not negative caching. + + Args: + tags: list of Tag objects + Returns: + TimeSeries object. + """ + tags.sort(key=lambda x: (x.tag_key, x.tag_value)) + cache_key = ','.join(str(x) for x in tags) + obj = cls._cache.get(cache_key, None) + if obj: + return obj + obj = cls.get_or_insert(cache_key, tags=[x.key() for x in tags]) + cls._cache[cache_key] = obj + return obj + + @classmethod + def FromExpr(cls, expr): + """Fetch TimeSeries using a string expression. + + Expression format is: + {=,=,...}[,,...] + + Args: + expr: string, see format above + Returns: + dict of (grouping key,grouping key,...): list of TimeSeries objects + Empty tuple key if there are no grouping keys + Raises: + InvalidExpression: expr not parseable + """ + parsed = cls._EXPR_RE.match(expr) + if not parsed: + raise InvalidExpression + + tags = [] + for key_value in parsed.group('selectors').split(','): + tags.append(Tag.FromStr(key_value)) + + groupings = [] + groupings_str = parsed.group('groupings') + if groupings_str: + for grouping in groupings_str.split(','): + groupings.append(Tag.FromKey(grouping)) + + return cls.GetPartial(tags, groupings) + + @classmethod + def GetPartial(cls, tags, groupings): + """Get a list of TimeSeries matches for the given tags. + + Does partial matching, e.g. key1=value1 matches the TimeSeries + key1=value1,key2=value2. This enables forwards-compatibility, i.e. it + allows data sources to add new tags without breaking existing readers. + + Args: + tags: list of Tag objects, for selection + groupings: list of list of Tag objects, for grouping + Returns: + dict of (grouping key,grouping key,...): list of TimeSeries objects + Empty tuple key if there are no grouping keys + """ + # Build all our query objects, then go through and use them, trying to + # keep a minimum set in memory at all times. + + # Build and start queries for selectors. + tag_queries = [] + for tag in tags: + tag_queries.append(cls.all(keys_only=True) + .filter('tags =', tag) + .run()) + + # Build and start queries for grouping. Also build a dict of sets used later + # to structure the response. + tags_by_tag_keys = collections.defaultdict(set) + grouping_queries = [] + for grouping in groupings: + or_queries = [] + for tag in grouping: + tags_by_tag_keys[tag.tag_key].add(tag.key()) + or_queries.append(cls.all(keys_only=True) + .filter('tags =', tag) + .run()) + grouping_queries.append(or_queries) + + # Pull the results from the selection queries and combine them (AND). + keys = None + for query in tag_queries: + timeseries = set(query) + if keys is None: + keys = timeseries + else: + keys &= timeseries + + # Pull the results from the grouping queries and combine them (OR within + # the tags for a group, AND between groups). We eliminate any TimeSeries + # that lack any values for the grouping tags. + for or_queries in grouping_queries: + or_set = set() + for query in or_queries: + timeseries = set(query) + # Pre-filter to trim down things to keep. + timeseries &= keys + or_set |= timeseries + keys &= or_set + + # Fetch all the relevant TimeSeries objects, either from cache or in from + # datastore in batch. Write anything fetched from datastore back to cache. + timeseries = [] + to_fetch = [] + for key in keys: + obj = cls._cache.get(key.name(), None) + if obj: + timeseries.append(obj) + else: + to_fetch.append(key) + if to_fetch: + for obj in cls.get(to_fetch): + cls._cache[obj.key().name()] = obj + timeseries.append(obj) + + # Shortcut for queries without grouping. + if not groupings: + return { + (): timeseries, + } + + # Bucket the TimeSeries objects by their grouping values. + ret = collections.defaultdict(list) + for ts in timeseries: + ts_tags = set(ts.tags) + intersects = [ + ts_tags & tags + for tags in tags_by_tag_keys.itervalues() + ] + # We handle cases where a given TimeSeries has multiple values for the + # same tag key. Those go into multiple buckets, hence product(). + for keysets in itertools.product(*intersects): + tags = tuple(Tag.FromStr(x.name()) for x in keysets) + ret[tags].append(ts) + + return ret + + @classmethod + def KeyName(cls, obj): + if isinstance(obj, db.Model): + return obj.key().name() + elif isinstance(obj, db.Key): + return obj.name() + else: + assert False, obj + + def AddValue(self, value, timestamp=None, offset=False): + """Add a value to this TimeSeries. + + Finds or creates the appropriate Values child object and adds the new + value to it. + + Args: + value: integer + timestamp: UNIX timestamp; defaults to now + offset: if True, values are offsets from previous value + """ + timestamp = timestamp or int(time.time()) + Values.AddValue(self, timestamp, value, offset=offset) + + def AddValues(self, timestamp_value_pairs, offset=False): + """Add values to this TimeSeries. + + Args: + timestamp_value_pairs: list of (unix_timestamp, value) tuples + offset: if True, values are offsets from previous value + """ + if not timestamp_value_pairs: + return + Values.AddValues(self, timestamp_value_pairs, offset=offset) + + def _FilterValues(self, values_list, start, end): + """Filter values based on criteria. + + Separated to move the yield call out of GetValues(). + + See GetValues() for args. + """ + for values in values_list: + for timestamp, value in values.GetValues(): + if timestamp < start or timestamp > end: + continue + yield (timestamp, value) + + def GetValues(self, start=None, end=None, resolution=None): + """Fetch values from this TimeSeries. + + Args: + start: UNIX timestamp; defaults to 0 + end: UNIX timestamp; defaults to sys.maxint + resolution: one of Values.RESOLUTIONS + Yields: + (unix_timestamp, value) pairs, in ascending chronological order + """ + start = start or 0 + end = end or sys.maxint + resolution = resolution or Values.FULL + + memiter = [] + if resolution in Values.LEADING_BLOCK_IN_MEMCACHE: + client = memcache.Client() + namespace = 'TimeSeries:%d' % resolution + values = client.get(self.KeyName(self), namespace=namespace) + if values: + if values.start_time <= start: + return self._FilterValues([values], start, end) + memiter = [values] + + query1 = (Values.all() + .ancestor(self) + .filter('resolution =', resolution) + .filter('start_time <=', start) + .order('-start_time')) + values1 = query1.run(limit=1) + + query2 = (Values.all() + .ancestor(self) + .filter('resolution =', resolution) + .filter('start_time >', start) + .filter('start_time <=', end) + .order('start_time')) + values2 = query2.run() + + return self._FilterValues(itertools.chain(values1, values2, memiter), + start, end) + + def __str__(self): + return self.key().name() + + +class Values(db.Model): + """A chunk of values for a timeseries. + + Subject to read/modify/write which must be transactional. + """ + # parent = TimeSeries + + FULL = 0 + MINUTE = 1 + HOUR = 2 + DAY = 3 + + SECONDS = { + MINUTE: 60, + HOUR: 60 * 60, + DAY: 60 * 60 * 24, + } + + RESOLUTION_NAMES = { + 'full': FULL, + 'minute': MINUTE, + 'hour': HOUR, + 'day': DAY, + } + + RESOLUTIONS = {FULL, MINUTE, HOUR, DAY} + DOWNSAMPLES = {MINUTE, HOUR, DAY} + + # Dict of resolution -> (max seconds in memcache, min seconds in memcache) + LEADING_BLOCK_IN_MEMCACHE = { + FULL: (60 * 60 * 24, 20 * 60), + MINUTE: (60 * 60 * 24, 2 * 60 * 60), + } + + resolution = db.IntegerProperty(required=True, choices=RESOLUTIONS) + + start_time = db.IntegerProperty(required=True) + start_value = db.IntegerProperty(required=True) + + end_time = db.IntegerProperty(required=True) + end_value = db.IntegerProperty(required=True) + + # Encoding: pairs of zigzag base128 varint encodings of deltas from the + # previous time (in seconds) and value (units unspecified) + times_and_values = db.BlobProperty() + + _BLOB_LIMIT = 2**16 + + @classmethod + def AddValue(cls, timeseries, timestamp, value, offset=False, + resolution=FULL): + """Single value wrapper for AddValues()""" + return cls.AddValues(timeseries, [[timestamp, value]], offset, resolution) + + @classmethod + def AddValues(cls, timeseries, timestamp_value_pairs, offset=False, + resolution=FULL): + """Find or create a block, then add values. + + Args: + timeseries: parent TimeSeries object + timestamp_value_pairs: list of (unix_timestamp, value) tuples + offset: if True, values are offsets from previous value + resolution: one of Values.RESOLUTIONS + """ + if not timestamp_value_pairs: + return + if resolution in cls.LEADING_BLOCK_IN_MEMCACHE: + cls._AddValuesMemcache(timeseries, timestamp_value_pairs, offset, + resolution) + else: + cls._AddValuesDatastore(timeseries, timestamp_value_pairs, offset, + resolution) + + @classmethod + def _AddValuesMemcache(cls, timeseries, timestamp_value_pairs, offset, + resolution): + """AddValues helper for memcache-backed blocks. + + See AddValues() for arguments. + """ + client = memcache.Client() + key = TimeSeries.KeyName(timeseries) + namespace = 'TimeSeries:%d' % resolution + values = client.gets(key, namespace=namespace) + if values: + values._AddValues(timestamp_value_pairs, offset=offset) + assert client.cas(key, values, namespace=namespace) + else: + values = cls._Create(timeseries, timestamp_value_pairs, resolution) + client.set(key, values, namespace=namespace) + + max_age, min_age = cls.LEADING_BLOCK_IN_MEMCACHE[resolution] + now = int(time.time()) + age = now - values.start_time + if age > max_age: + new_start = now - min_age + logging.info('Memcache block is too old (%d > %d); splitting at %d', + age, max_age, new_start) + values._Split(split_timestamp=new_start) + client.set(key, values, namespace=namespace) + + @classmethod + @db.transactional() + def _AddValuesDatastore(cls, timeseries, timestamp_value_pairs, offset, + resolution, add_downsamples=True): + """AddValues helper for datastore-backed blocks. + + See AddValues() for arguments. + """ + values = (cls.all() + .ancestor(timeseries) + .filter('resolution =', resolution) + .order('-start_time')).fetch(1) + if values: + values = values[0] + values._AddValues(timestamp_value_pairs, offset=offset, + add_downsamples=add_downsamples) + else: + values = cls._Create(timeseries, timestamp_value_pairs, + resolution, add_downsamples=add_downsamples) + + values.save() + + @classmethod + def _Create(cls, timeseries, timestamp_value_pairs, resolution, + add_downsamples=True): + """Factory for new Values object.""" + first_timestamp, first_value = timestamp_value_pairs[0] + values = Values( + parent=timeseries, + resolution=resolution, + start_time=first_timestamp, + start_value=first_value, + end_time=first_timestamp, + end_value=first_value) + + if add_downsamples and resolution == cls.FULL: + # Add to downsamples just in case. + for downsample in cls.DOWNSAMPLES: + cls.AddValues(timeseries, [[first_timestamp, first_value]], + resolution=downsample) + + remaining = timestamp_value_pairs[1:] + values._AddValues(remaining, add_downsamples=add_downsamples) + + return values + + @classmethod + def ToZigZag(cls, value): + """Converts a value to zig zag encoding. + + Args: + value: positive or negative integer + Returns: + Positive integer encoding + """ + if value < 0: + return (abs(value) << 1) - 1 + else: + return value << 1 + + @classmethod + def FromZigZag(cls, value): + """Converts a value from zig zag encoding. + + Args: + value: positive encoded integer + Returns: + Positive or negative decoded integer + """ + if value & 0x01: + return 0 - ((value + 1) >> 1) + else: + return value >> 1 + + @classmethod + def ToVarint(cls, value): + """Encodes an integer as a zig zag, variable-length string. + + Args: + value: positive or negative integer + Returns: + A string with one or more encoded bytes + """ + value = cls.ToZigZag(value) + ret = [] + while value >= 2**7: + ret.append(chr(0x80 | (value % 2**7))) + value >>= 7 + ret.append(chr(value)) + return ''.join(ret) + + @classmethod + def FromVarint(cls, fh): + """Decodes a variable-length, zig zag integer from a file handle. + + Moves the file pointer forward to the byte after the encoded integer. + + Args: + fh: file handle to read from; use StringIO to use a string + Returns: + Positive or negative decoded integer, or None if the stream stops + immediately + Raises: + InvalidSerializedData: stream stops in the middle of a varint + """ + val = 0 + shift = 0 + while True: + byte = fh.read(1) + if not byte: + if val == 0: + return None + else: + raise InvalidSerializedData('incomplete varint') + byte_val = ord(byte) + val |= ((byte_val & 0x7f) << shift) + if not byte_val & 0x80: + break + shift += 7 + return cls.FromZigZag(val) + + def _AddValues(self, timestamp_value_pairs, offset=False, + add_downsamples=True): + """Add values to this block. + + Args: + timestamp_value_pairs: list of (unix_timestamp, value) tuples + offset: if True, values are offsets from previous value + add_downsamples: if True, add downsample values if necessary + """ + if not timestamp_value_pairs: + return + + timestamp_value_pairs.sort() + + if offset: + # Convert to absolute values + prev_value = self.end_value + for pair in timestamp_value_pairs: + pair[1] += prev_value + prev_value = pair[1] + + if add_downsamples and self.resolution == self.FULL: + self._CheckAddDownsamples(timestamp_value_pairs) + + parts = [ + self.times_and_values or '', + ] + + for timestamp, value in timestamp_value_pairs: + if timestamp < self.start_time: + logging.warn('Skipping old value (%s): %d < %d', + self.key().name(), timestamp, self.start_time) + parts.append(self.ToVarint(timestamp - self.end_time)) + parts.append(self.ToVarint(value - self.end_value)) + self.end_time = timestamp + self.end_value = value + self.times_and_values = ''.join(parts) + + if len(self.times_and_values) > self._BLOB_LIMIT: + self._Split() + + def _CheckAddDownsamples(self, timestamp_value_pairs): + """Add downsamples if necessary. + + Args: + timestamp_value_pairs: sorted list of (unix_timestamp, value) tuples + """ + # We add a skew factor so we don't have every timeseries trying to add + # downsamples at the same time. + skew = hash(self.parent_key().name()) + + assert self.resolution == self.FULL + for resolution, seconds in self.SECONDS.iteritems(): + prev_timestamp = self.end_time + for timestamp, value in timestamp_value_pairs: + if (timestamp + skew) / seconds != (prev_timestamp + skew) / seconds: + # We've crossed a downsample boundary + Values.AddValue(self.parent_key(), timestamp, value, + resolution=resolution) + prev_timestamp = timestamp + + def GetValues(self): + """Fetch all values from this block. + + Returns: + Sorted list of (unix_timestamp, value) tuples. + Raises: + InvalidSerializedData: serialized values stops in the middle of a + timestamp and value pair + """ + values = [(self.start_time, self.start_value)] + if not self.times_and_values: + return values + + last_timestamp = self.start_time + last_value = self.start_value + + fh = StringIO.StringIO(self.times_and_values) + while True: + timestamp = self.FromVarint(fh) + if timestamp is None: + break + value = self.FromVarint(fh) + if value is None: + raise InvalidSerializedData('time without value') + last_timestamp += timestamp + last_value += value + values.append((last_timestamp, last_value)) + + values.sort(key=lambda x: x[0]) + + if self.resolution != self.FULL: + seconds = self.SECONDS[self.resolution] + # Clean up the values list for potential duplicates + i = 1 + while i < len(values): + if values[i][0] / seconds == values[i - 1][0] / seconds: + # Latter value is a dupe + del values[i] + else: + i += 1 + + return values + + def _Split(self, split_timestamp=None): + """Split this block into two. + + Decodes the current block and puts the latter half of the values into a + new block. + + Args: + split_timestamp: UNIX timestamp around which to split (inclusive in the + latter block) + """ + values = self.GetValues() + + if split_timestamp is None: + split_point = len(values) / 2 + else: + split_point = bisect.bisect_left(values, (split_timestamp, 0)) + + logging.info( + 'Splitting block from TimeSeries=%s, resolution=%d, ' + 'split_timestamp=%s, split_point=%d', + TimeSeries.KeyName(self.parent_key()), self.resolution, + split_timestamp, split_point) + + old_values = values[:split_point] + if split_timestamp is None: + old_obj = self._Create(self.parent_key(), + old_values, + resolution=self.resolution, + add_downsamples=False) + old_obj.save() + else: + # split_timestamp implies that we're doing an uneven block split. To + # avoid unnecessary fragmentation, add to the leading block in + # datastore, rather than creating a new one. + self._AddValuesDatastore(self.parent_key(), + old_values, + offset=False, + resolution=self.resolution, + add_downsamples=False) + + self.times_and_values = None + self.start_time = values[split_point][0] + self.start_value = values[split_point][1] + self.end_time = self.start_time + self.end_value = self.start_value + self._AddValues(values[split_point + 1:], add_downsamples=False) diff --git a/tools/collector.py b/tools/collector.py new file mode 100644 index 0000000..61e003a --- /dev/null +++ b/tools/collector.py @@ -0,0 +1,72 @@ +#!/usr/bin/python2.7 + +import argparse +import glob +import socket +import sys +import time + +sys.path.append('..') + +from lib import tsdblib + + +parser = argparse.ArgumentParser() +parser.add_argument('--project', required=True) +parser.add_argument('--base_url', required=True) +FLAGS = parser.parse_args() + + +class Collector(object): + + def __init__(self): + self._start_time = int(time.time()) + self._base_tags = self._BaseTags() + self._client = tsdblib.TSDBClient(FLAGS.base_url, + jit_callback=self._PutValues) + + def _GetHostname(self): + return socket.gethostname() + + def _GetMACAddresses(self): + for path in glob.iglob('/sys/class/net/*/address'): + mac_address = open(path, 'r').read().strip() + if mac_address == '00:00:00:00:00:00': + continue + yield mac_address + + def _GetCollectorUptime(self): + return int(time.time()) - self._start_time + + def _GetSystemUptime(self): + return int(open('/proc/uptime', 'r').read().split('.', 1)[0]) + + def _GetLoadAverage1m(self): + return int(float(open('/proc/loadavg', 'r').read().split(' ', 1)[0]) * 100) + + def _BaseTags(self): + return [ + ('project', FLAGS.project), + ('hostname', self._GetHostname()), + ] + [ + ('mac_address', mac_address) + for mac_address in self._GetMACAddresses() + ] + + def _CycleValues(self): + return [ + ('collector_uptime_seconds', self._GetCollectorUptime()), + ('system_uptime_seconds', self._GetSystemUptime()), + ('system_load_average_1m', self._GetLoadAverage1m()), + ] + + def _PutValues(self): + cycle_values = self._CycleValues() + for name, value in cycle_values: + self._client.PutValue(self._base_tags + [('value', name)], value) + + def Loop(self): + time.sleep(9999999999) + + +Collector().Loop()