diff --git a/.github/workflows/biggraphite.yml b/.github/workflows/biggraphite.yml index 09a5920b18..77c344c0a3 100644 --- a/.github/workflows/biggraphite.yml +++ b/.github/workflows/biggraphite.yml @@ -11,7 +11,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: [ '3.x', 'pypy-3.6' ] + python-version: [ '3.9', 'pypy-3.6' ] env: JAVA: false # Default diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 2cac76e136..7858298630 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -26,7 +26,8 @@ This is the main backend for BigGraphite and the one that should be used in prod - ```BG_CASSANDRA_META_WRITE_CONSISTENCY```: Data write consistency (default: ```ONE```) - ```BG_CASSANDRA_REPLICA_ID```: Identifier of this replica (default: ```0```) - ```BG_CASSANDRA_READ_ON_SAMPLING_RATE```: Sampling rate to update metadata field ```read_on```. Setting to ```0``` disables updating ````read-on```` (default: ```0.1```) -- ```BG_CREATION_RATE_LIMIT```: Maximun number of new metadata to create per second (default: ```300```) +- ```BG_CREATION_RATE_LIMIT```: Maximum number of new metadata to create per second (default: ```300```) +- ```BG_CASSANDRA_COMPACTION_STRATEGY```: Sets compaction strategy decision, if not set, it will be set to```TimeWindowCompactionStrategy``` above cassandra 3.9 otherwise ```DateTieredCompactionStrategy``` (default: ```None```) ### SSL options diff --git a/biggraphite/drivers/cassandra.py b/biggraphite/drivers/cassandra.py index 31a24411b6..19a1dee0a5 100755 --- a/biggraphite/drivers/cassandra.py +++ b/biggraphite/drivers/cassandra.py @@ -203,77 +203,67 @@ SYNCDB_DATA = prometheus_client.Summary( - "bg_cassandra_syncdb_data_latency_seconds", - "DB data sync latency in seconds" + "bg_cassandra_syncdb_data_latency_seconds", "DB data sync latency in seconds" ) SYNCDB_METADATA = prometheus_client.Summary( "bg_cassandra_syncdb_metadata_latency_seconds", - "DB metadata sync latency in seconds" + "DB metadata sync latency in seconds", ) CLEAN_EMPTY_DIR = prometheus_client.Summary( - "bg_cassandra_clean_empty_dir_latency_seconds", - "clean empty dir latency in seconds" + "bg_cassandra_clean_empty_dir_latency_seconds", "clean empty dir latency in seconds" ) CLEAN_BROKEN_METRICS = prometheus_client.Summary( "bg_cassandra_clean_broken_metrics_latency_seconds", - "clean broken metrics latency in seconds" + "clean broken metrics latency in seconds", ) CLEAN_EXPIRED_METRICS = prometheus_client.Summary( "bg_cassandra_clean_expired_metrics_latency_seconds", - "clean expired metrics latency in seconds" + "clean expired metrics latency in seconds", ) REPAIR_MISSING_DIR = prometheus_client.Summary( "bg_cassandra_repair_missing_dir_latency_seconds", - "repair missing directory latency in seconds" + "repair missing directory latency in seconds", ) SELECT_METRIC_METADATA_LATENCY = prometheus_client.Summary( "bg_cassandra_select_metric_metadata_latency_seconds", - "select metric metadata latency in seconds" + "select metric metadata latency in seconds", ) DELETE_DIRECTORY_LATENCY = prometheus_client.Summary( "bg_cassandra_delete_directory_latency_seconds", - "delete directory latency in seconds" + "delete directory latency in seconds", ) HAS_DIRECTORY = prometheus_client.Summary( - "bg_cassandra_has_directory_latency_seconds", - "has directory latency in seconds" + "bg_cassandra_has_directory_latency_seconds", "has directory latency in seconds" ) UPDATE_METRIC = prometheus_client.Summary( - "bg_cassandra_update_metric_latency_seconds", - "update_metric latency in seconds" + "bg_cassandra_update_metric_latency_seconds", "update_metric latency in seconds" ) CLEAN_START_TS_METRIC = prometheus_client.Gauge( - "bg_util_clean_ts_start", - "clean last start timestamp" + "bg_util_clean_ts_start", "clean last start timestamp" ) CLEAN_STOP_TS_METRIC = prometheus_client.Gauge( - "bg_util_clean_ts_stop", - "clean last end timestamp" + "bg_util_clean_ts_stop", "clean last end timestamp" ) CLEAN_STEP_TS = prometheus_client.Gauge( - "bg_util_clean_step_duration", - "Duration of any step of the cleaning", - ['step'] + "bg_util_clean_step_duration", "Duration of any step of the cleaning", ["step"] ) CLEAN_CURRENT_STEP = prometheus_client.Gauge( "bg_util_clean_current_step", - "clean current step (0: none, 1: expired directories, 2: expired metrics)" + "clean current step (0: none, 1: expired directories, 2: expired metrics)", ) CLEAN_CURRENT_OFFSET = prometheus_client.Gauge( - "bg_util_clean_current_offset", - "clean current offset" + "bg_util_clean_current_offset", "clean current offset" ) CLEAN_SKIPPED_OFFSET = prometheus_client.Counter( - "bg_util_clean_skipped_offset", - "skipped offset while cleaning" + "bg_util_clean_skipped_offset", "skipped offset while cleaning" ) ERROR_BIND_METRIC = prometheus_client.Counter( "bg_error_bind_metric", "Number of incomplete or invalid record read from backend", - ["type"] + ["type"], ) log = logging.getLogger(__name__) @@ -302,7 +292,7 @@ DEFAULT_TIMEOUT_QUERY_UTIL = 120 DEFAULT_READ_ON_SAMPLING_RATE = 0.1 DEFAULT_USE_LUCENE = False -DEFAULT_SSL_VERIFY_MODE = 'required' +DEFAULT_SSL_VERIFY_MODE = "required" # Exceptions that are not considered fatal during batch jobs. # The affected range will simply be ignored. @@ -361,6 +351,11 @@ def _consistency_validator(k): "updated_on_ttl_sec": int, "read_on_sampling_rate": float, "use_lucene": bool, + # allowed compaction strategies: DateTieredCompactionStrategy or TimeWindowCompactionStrategy + # None is legacy behaviour, using cassandra version + "compaction_strategy": lambda k: k + if k in ["DateTieredCompactionStrategy", "TimeWindowCompactionStrategy"] + else None, } @@ -373,35 +368,29 @@ def add_argparse_arguments(parser): default=DEFAULT_KEYSPACE, ) parser.add_argument( - "--cassandra_username", - help="Cassandra username.", - default=None + "--cassandra_username", help="Cassandra username.", default=None ) parser.add_argument( - "--cassandra_password", - help="Cassandra password.", - default=None + "--cassandra_password", help="Cassandra password.", default=None ) parser.add_argument( "--cassandra_ssl_enable", help="Cassandra enable SSL", default=False, - action='store_true' + action="store_true", ) parser.add_argument( "--cassandra_ssl_verify_locations", help="Cassandra SSL verify locations (CA path)", - default=None + default=None, ) parser.add_argument( - "--cassandra_ssl_cert_file", - help="Cassandra SSL certificate file", - default=None + "--cassandra_ssl_cert_file", help="Cassandra SSL certificate file", default=None ) parser.add_argument( "--cassandra_ssl_key_file", help="Cassandra SSL certificate's key file", - default=None + default=None, ) parser.add_argument( "--cassandra_ssl_verify_mode", @@ -412,28 +401,28 @@ def add_argparse_arguments(parser): parser.add_argument( "--cassandra_ssl_check_hostname", help="Cassandra SSL hostname to check", - default=None + default=None, ) parser.add_argument( "--cassandra_ssl_enable_metadata", help="Cassandra enable SSL (metadata)", default=False, - action='store_true', + action="store_true", ) parser.add_argument( "--cassandra_ssl_verify_locations_metadata", help="Cassandra SSL verify locations (CA path, for metadata)", - default=None + default=None, ) parser.add_argument( "--cassandra_ssl_cert_file_metadata", help="Cassandra SSL certificate file (metadata)", - default=None + default=None, ) parser.add_argument( "--cassandra_ssl_key_file_metadata", help="Cassandra SSL certificate's key file (metadata)", - default=None + default=None, ) parser.add_argument( "--cassandra_ssl_verify_mode_metadata", @@ -444,7 +433,7 @@ def add_argparse_arguments(parser): parser.add_argument( "--cassandra_ssl_check_hostname_metadata", help="Cassandra SSL hostname to check", - default=None + default=None, ) parser.add_argument( "--cassandra_contact_points", @@ -584,6 +573,12 @@ def add_argparse_arguments(parser): help="Use the Stratio Lucene Index.", default=DEFAULT_USE_LUCENE, ) + parser.add_argument( + "--cassandra_compaction_strategy", + help="Cassandra compaction strategy", + type=str, + default=None, + ) class Error(bg_accessor.Error): @@ -888,6 +883,7 @@ def __init__( bulkimport, data_write_consistency, data_read_consistency, + compaction_strategy=None, ): self._keyspace = keyspace self._session = session @@ -898,12 +894,13 @@ def __init__( self._shard = shard self._data_write_consistency = data_write_consistency self._data_read_consistency = data_read_consistency - - release_version = list(session.get_pools())[0].host.release_version - if version.LooseVersion(release_version) >= version.LooseVersion("3.9"): - self._COMPACTION_STRATEGY = "TimeWindowCompactionStrategy" - else: - self._COMPACTION_STRATEGY = _COMPACTION_STRATEGY + self._COMPACTION_STRATEGY = compaction_strategy + if not compaction_strategy: + release_version = list(session.get_pools())[0].host.release_version + if version.LooseVersion(release_version) >= version.LooseVersion("3.9"): + self._COMPACTION_STRATEGY = "TimeWindowCompactionStrategy" + else: + self._COMPACTION_STRATEGY = _COMPACTION_STRATEGY def __bulkimport_filename(self, filename): current = multiprocessing.current_process() @@ -1197,6 +1194,7 @@ def __init__( read_on_sampling_rate=DEFAULT_READ_ON_SAMPLING_RATE, use_lucene=DEFAULT_USE_LUCENE, enable_metadata=True, + compaction_strategy=None, ): """Record parameters needed to connect. @@ -1247,6 +1245,8 @@ def __init__( read_on_sampling_rate: int, Read_on update frequency (per call) updated_on_ttl_sec: int, Updated_on update frequency (per seconds) use_lucene: bool, Use the Stratio Lucene Index + compaction_strategy: string or None, compaction strategy to use, if not, + it will use a default depending on cassandra version. """ backend_name = "cassandra:" + keyspace super(_CassandraAccessor, self).__init__(backend_name) @@ -1323,6 +1323,7 @@ def __init__( ] self._data_write_consistency = consistency_name_to_value[data_write_consistency] self._data_read_consistency = consistency_name_to_value[data_read_consistency] + self.__compaction_strategy = compaction_strategy if writer is None: # TODO: Currently a random shard is good enough. # We should use a counter stored in cassandra instead. @@ -1387,14 +1388,14 @@ def __prepare(cql, consistency=self._meta_write_consistency): __prepare( 'INSERT INTO "%s".metrics (name, parent, %s) VALUES (?, ?, %s);' % (self.keyspace_metadata, components_names, components_marks) - ) + ), ) self.__insert_directory_statement = _CassandraExecutionRequest( INSERT_DIRECTORY, __prepare( 'INSERT INTO "%s".directories (name, parent, %s) VALUES (?, ?, %s) IF NOT EXISTS;' % (self.keyspace_metadata, components_names, components_marks) - ) + ), ) self.__insert_directory_statement.statement.serial_consistency_level = ( self._meta_serial_consistency @@ -1406,7 +1407,7 @@ def __prepare(cql, consistency=self._meta_write_consistency): "SELECT id, config, toUnixTimestamp(updated_on), name" ' FROM "%s".metrics_metadata WHERE name = ?;' % self.keyspace_metadata, self._meta_read_consistency, - ) + ), ) self.__select_metric_statement = __prepare( 'SELECT * FROM "%s".metrics WHERE name = ?;' % self.keyspace_metadata, @@ -1421,41 +1422,42 @@ def __prepare(cql, consistency=self._meta_write_consistency): __prepare( 'UPDATE "%s".metrics_metadata SET config=?, updated_on=now()' " WHERE name=?;" % self.keyspace_metadata - ) + ), ) self.__touch_metrics_metadata_statement = _CassandraExecutionRequest( TOUCH_METRIC_METADATA, __prepare( 'UPDATE "%s".metrics_metadata SET updated_on=now()' " WHERE name=?;" % self.keyspace_metadata - )) + ), + ) self.__insert_metrics_metadata_statement = _CassandraExecutionRequest( INSERT_METRIC_METADATA, __prepare( 'INSERT INTO "%s".metrics_metadata (name, created_on, updated_on, id, config)' " VALUES (?, now(), now(), ?, ?);" % self.keyspace_metadata - ) + ), ) self.__update_metric_read_on_metadata_statement = _CassandraExecutionRequest( UPDATE_READ_ON_METADATA, __prepare( 'UPDATE "%s".metrics_metadata SET read_on=now()' " WHERE name=? IF EXISTS;" % self.keyspace_metadata - ) + ), ) self.__delete_metric = _CassandraExecutionRequest( DELETE_METRIC, __prepare( 'DELETE FROM "%s".metrics WHERE name=?;' % (self.keyspace_metadata), consistency=cassandra.ConsistencyLevel.QUORUM, - ) + ), ) self.__delete_directory = _CassandraExecutionRequest( DELETE_DIRECTORY, __prepare( 'DELETE FROM "%s".directories WHERE name=?;' % (self.keyspace_metadata), consistency=cassandra.ConsistencyLevel.QUORUM, - ) + ), ) self.__delete_metric_metadata = _CassandraExecutionRequest( DELETE_METRIC_METADATA, @@ -1463,7 +1465,7 @@ def __prepare(cql, consistency=self._meta_write_consistency): 'DELETE FROM "%s".metrics_metadata WHERE name=?;' % (self.keyspace_metadata), consistency=cassandra.ConsistencyLevel.QUORUM, - ) + ), ) def _get_ssl_context(self): @@ -1473,7 +1475,7 @@ def _get_ssl_context(self): ssl_context = SSLContext(PROTOCOL_TLS) ssl_context.load_default_certs() - if self.__ssl_verify_mode == 'required': + if self.__ssl_verify_mode == "required": ssl_context.verify_mode = CERT_REQUIRED else: ssl_context.verify_mode = CERT_NONE @@ -1482,17 +1484,14 @@ def _get_ssl_context(self): if self.__ssl_check_hostname is not None: ssl_context.check_hostname = True - ssl_options = {'server_hostname': self.__ssl_check_hostname} + ssl_options = {"server_hostname": self.__ssl_check_hostname} if self.__ssl_verify_locations is not None: - ssl_context.load_verify_locations( - self.__ssl_verify_locations - ) + ssl_context.load_verify_locations(self.__ssl_verify_locations) if self.__ssl_cert_file is not None and self.__ssl_key_file is not None: ssl_context.load_cert_chain( - certfile=self.__ssl_cert_file, - keyfile=self.__ssl_key_file + certfile=self.__ssl_cert_file, keyfile=self.__ssl_key_file ) return ssl_context, ssl_options @@ -1504,7 +1503,7 @@ def _get_ssl_context_metadata(self): ssl_context = SSLContext(PROTOCOL_TLS) ssl_context.load_default_certs() - if self.__ssl_verify_mode_metadata == 'required': + if self.__ssl_verify_mode_metadata == "required": ssl_context.verify_mode = CERT_REQUIRED else: ssl_context.verify_mode = CERT_NONE @@ -1513,17 +1512,18 @@ def _get_ssl_context_metadata(self): if self.__ssl_check_hostname_metadata is not None: ssl_context.check_hostname = True - ssl_options = {'server_hostname': self.__ssl_check_hostname_metadata} + ssl_options = {"server_hostname": self.__ssl_check_hostname_metadata} if self.__ssl_verify_locations_metadata is not None: - ssl_context.load_verify_locations( - self.__ssl_verify_locations_metadata - ) + ssl_context.load_verify_locations(self.__ssl_verify_locations_metadata) - if self.__ssl_cert_file_metadata is not None and self.__ssl_key_file_metadata is not None: + if ( + self.__ssl_cert_file_metadata is not None + and self.__ssl_key_file_metadata is not None + ): ssl_context.load_cert_chain( certfile=self.__ssl_cert_file_metadata, - keyfile=self.__ssl_key_file_metadata + keyfile=self.__ssl_key_file_metadata, ) return ssl_context, ssl_options @@ -1546,6 +1546,7 @@ def _connect_clusters(self): self.__bulkimport, self._meta_write_consistency, self._data_read_consistency, + self.__compaction_strategy, ) if self.__enable_metrics: self.__metrics["data"] = expose_metrics(self.__cluster_data.metrics, "data") @@ -1553,7 +1554,10 @@ def _connect_clusters(self): # metadata cluster if self.metadata_enabled: if self.contact_points_data != self.contact_points_metadata: - ssl_context_metadata, ssl_options_metadata = self._get_ssl_context_metadata() + ( + ssl_context_metadata, + ssl_options_metadata, + ) = self._get_ssl_context_metadata() self.__cluster_metadata, self.__session_metadata = self._connect( self.__cluster_metadata, self.__session_metadata, @@ -1571,7 +1575,9 @@ def _connect_clusters(self): self.__cluster_metadata.metrics, "metadata" ) - def _connect(self, cluster, session, contact_points, port, ssl_context=None, ssl_options=None): + def _connect( + self, cluster, session, contact_points, port, ssl_context=None, ssl_options=None + ): lb_policy = c_policies.TokenAwarePolicy(c_policies.DCAwareRoundRobinPolicy()) # See https://datastax-oss.atlassian.net/browse/PYTHON-643 lb_policy.shuffle_replicas = True @@ -1667,6 +1673,7 @@ def _execute_async_metadata(self, execution_request, **kwargs): def _execute_concurrent(self, session, execution_requests, **kwargs): """Wrapper for concurrent.execute_concurrent().""" + def marked_args(request): # Hackish but this is la last checkpoint where we can mark # the execution request as executed @@ -1680,9 +1687,7 @@ def marked_args(request): if not self.__trace: args = [marked_args(ec) for ec in execution_requests] - return c_concurrent.execute_concurrent( - session, args, **kwargs - ) + return c_concurrent.execute_concurrent(session, args, **kwargs) query_results = [] for execution_request in execution_requests: @@ -1698,10 +1703,14 @@ def marked_args(request): return query_results def _execute_concurrent_data(self, execution_requests, **kwargs): - return self._execute_concurrent(self.__session_data, execution_requests, **kwargs) + return self._execute_concurrent( + self.__session_data, execution_requests, **kwargs + ) def _execute_concurrent_metadata(self, execution_requests, **kwargs): - return self._execute_concurrent(self.__session_metadata, execution_requests, **kwargs) + return self._execute_concurrent( + self.__session_metadata, execution_requests, **kwargs + ) def create_metric(self, metric): """See bg_accessor.Accessor.""" @@ -1759,25 +1768,20 @@ def update_metric(self, name, updated_metadata): UPDATE_METRIC, self.__update_metric_metadata_statement.with_param_list( [metadata_dict, encoded_metric_name] - ) + ), ) def delete_metric(self, name): """See bg_accessor.Accessor.""" super(_CassandraAccessor, self).delete_metric(name) - self._execute_async_metadata( - self.__delete_metric.with_params(name) - ) - self._execute_async_metadata( - self.__delete_metric_metadata.with_params(name) - ) + self._execute_async_metadata(self.__delete_metric.with_params(name)) + self._execute_async_metadata(self.__delete_metric_metadata.with_params(name)) def delete_directory(self, directory): """See bg_accessor.Accessor.""" super(_CassandraAccessor, self).delete_directory(directory) self._execute_metadata( - DELETE_DIRECTORY_LATENCY, - self.__delete_directory.with_params(directory) + DELETE_DIRECTORY_LATENCY, self.__delete_directory.with_params(directory) ) def _create_parent_dirs_queries(self, components): @@ -1819,23 +1823,23 @@ def drop_all_metrics_in_keyspace(keyspace, session): ) select_request = _CassandraExecutionRequest( - SELECT_TABLES_FROM_KEYSPACE, - statement_str, - keyspace) + SELECT_TABLES_FROM_KEYSPACE, statement_str, keyspace + ) tables = [r[0] for r in self._execute(session, select_request)] for table in tables: self._execute( session, _CassandraExecutionRequest( - DROP_ALL_METRICS, - 'TRUNCATE "%s"."%s";' % (keyspace, table) - ) + DROP_ALL_METRICS, 'TRUNCATE "%s"."%s";' % (keyspace, table) + ), ) drop_all_metrics_in_keyspace(self.keyspace, self.__session_data) if self.metadata_enabled: - drop_all_metrics_in_keyspace(self.keyspace_metadata, self.__session_metadata) + drop_all_metrics_in_keyspace( + self.keyspace_metadata, self.__session_metadata + ) if self.__downsampler: self.__downsampler.clear() @@ -1940,7 +1944,9 @@ def _select_metric(self, metric_name): result = list( self._execute_metadata( SELECT_METRIC_METADATA_LATENCY, - self.__select_metric_metadata_statement.with_params(encoded_metric_name) + self.__select_metric_metadata_statement.with_params( + encoded_metric_name + ), ) ) if not result: @@ -1982,8 +1988,8 @@ def has_directory(self, directory): _CassandraExecutionRequest( SELECT_DIRECTORY, self.__select_directory_statement, - encoded_directory - ) + encoded_directory, + ), ) ) return bool(result) @@ -1998,12 +2004,18 @@ def __query_key(self, statement, params): return str(statement) + str(params) def _get_metrics(self, metric_names, strict_checks=False): - metric_names = [".".join(self._components_from_name(metric_name)[:-1]) - for metric_name in metric_names] - metric_names = [bg_metric.encode_metric_name(metric_name) for metric_name in metric_names] + metric_names = [ + ".".join(self._components_from_name(metric_name)[:-1]) + for metric_name in metric_names + ] + metric_names = [ + bg_metric.encode_metric_name(metric_name) for metric_name in metric_names + ] - execution_requests = [self._make_select_metric_execution_request(metric_name) - for metric_name in metric_names] + execution_requests = [ + self._make_select_metric_execution_request(metric_name) + for metric_name in metric_names + ] results = self._select_metrics(execution_requests) @@ -2028,8 +2040,12 @@ def _select_metrics(self, execution_requests): def get_metric(self, metric_name, strict_checks=False): """See bg_accessor.Accessor.""" super(_CassandraAccessor, self).get_metric(metric_name) - tracing.add_attr_to_trace("metric.name", bg_metric.encode_metric_name(metric_name)) - return next(iter(self._get_metrics([metric_name], strict_checks=strict_checks)), None) + tracing.add_attr_to_trace( + "metric.name", bg_metric.encode_metric_name(metric_name) + ) + return next( + iter(self._get_metrics([metric_name], strict_checks=strict_checks)), None + ) def _bind_metric(self, row, strict_checks=False): uid = row[0] @@ -2120,7 +2136,9 @@ def __glob_names(self, table, glob): if isinstance(query, six.string_types): execution_request = _CassandraExecutionRequest( counter, - c_query.SimpleStatement(query, consistency_level=self._meta_read_consistency) + c_query.SimpleStatement( + query, consistency_level=self._meta_read_consistency + ), ) else: execution_request = _CassandraExecutionRequest(counter, query) @@ -2131,7 +2149,8 @@ def __glob_names(self, table, glob): # (which always contains the query and the parameters). # WARNING: With the current code PrepareStatement would not be cached. keys_to_queries = { - self.__query_key(ec.statement, ec.params): ec for ec in execution_requests + self.__query_key(ec.statement, ec.params): ec + for ec in execution_requests } cached_results = self.cache.get_many(keys_to_queries.keys()) for key in cached_results: @@ -2312,7 +2331,7 @@ def syncdb(self, retentions=None, dry_run=False): if not dry_run: self._execute_metadata( SYNCDB_METADATA, - _CassandraExecutionRequest(METADATA_SCHEMA_UPDATE, query) + _CassandraExecutionRequest(METADATA_SCHEMA_UPDATE, query), ) self.__cluster_data.refresh_schema_metadata() @@ -2346,8 +2365,7 @@ def syncdb(self, retentions=None, dry_run=False): schema += query + "\n\n" if not dry_run: self._execute_data( - SYNCDB_DATA, - _CassandraExecutionRequest(SYNCDB, query) + SYNCDB_DATA, _CassandraExecutionRequest(SYNCDB, query) ) if not was_connected: @@ -2416,12 +2434,8 @@ def map( while token < stop_token: # Schedule read first. future = self._execute_async_metadata( - _CassandraExecutionRequest( - MAP_ITERATION, - select, - int(token) - ), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + _CassandraExecutionRequest(MAP_ITERATION, select, int(token)), + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) # Then execute callback for the *previous* result while C* is @@ -2516,12 +2530,8 @@ def sync_map( while token < stop_token: # Schedule read first. future = self._execute_async_metadata( - _CassandraExecutionRequest( - MAP_ITERATION, - select, - int(token) - ), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + _CassandraExecutionRequest(MAP_ITERATION, select, int(token)), + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) # Then, read new data. @@ -2622,7 +2632,7 @@ def directories_to_check(result, stop_token): yield _CassandraExecutionRequest( CLEAN_DIRECTORIES_TO_CHECK, has_metric_query, - name + DIRECTORY_SEPARATOR + "%" + name + DIRECTORY_SEPARATOR + "%", ) # Reset the error count. @@ -2635,12 +2645,8 @@ def directories_to_check(result, stop_token): try: result = self._execute_metadata( CLEAN_EMPTY_DIR, - _CassandraExecutionRequest( - CLEAN, - dir_query, - int(token) - ), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + _CassandraExecutionRequest(CLEAN, dir_query, int(token)), + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) # Reset the error count. @@ -2680,13 +2686,15 @@ def directories_to_check(result, stop_token): continue dir_name = response.result_or_exc.response_future.query.values[0] - dir_name = str(dir_name.decode('ascii')) + dir_name = str(dir_name.decode("ascii")) dir_name = str(dir_name).rpartition(".")[0] callback(dir_name, list(response.result_or_exc)) if callback_on_progress: - callback_on_progress(token - start_token, stop_token - start_token, token) + callback_on_progress( + token - start_token, stop_token - start_token, token + ) def repair( self, @@ -2703,13 +2711,13 @@ def repair( """ super(_CassandraAccessor, self).repair(start_key, end_key, shard, nshards) - disable_repair = os.getenv('DISABLE_REPAIR_MISSING_DIR') + disable_repair = os.getenv("DISABLE_REPAIR_MISSING_DIR") if disable_repair is None: self._repair_missing_dir( start_key, end_key, shard, nshards, callback_on_progress ) - disable_invalid_metrics = os.getenv('DISABLE_REPAIR_INVALID_METRICS') + disable_invalid_metrics = os.getenv("DISABLE_REPAIR_INVALID_METRICS") if disable_invalid_metrics is None: self._delete_invalid_metrics( start_key, end_key, shard, nshards, callback_on_progress @@ -2752,13 +2760,11 @@ def _delete_invalid_metrics( check_query = self._prepare_background_request( 'SELECT name, token(name) FROM "%s".metrics_metadata ' - 'WHERE name = ?;' - % (self.keyspace_metadata) + "WHERE name = ?;" % (self.keyspace_metadata) ) delete_query = self._prepare_background_request( - 'DELETE FROM "%s".metrics WHERE name = ?;' - % (self.keyspace_metadata) + 'DELETE FROM "%s".metrics WHERE name = ?;' % (self.keyspace_metadata) ) ignored_errors = 0 @@ -2768,11 +2774,9 @@ def _delete_invalid_metrics( result = self._execute_metadata( CLEAN_BROKEN_METRICS, _CassandraExecutionRequest( - CLEAN_BROKEN_METRICS_COUNT, - metrics_query, - int(token) + CLEAN_BROKEN_METRICS_COUNT, metrics_query, int(token) ), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) # Reset the error count. @@ -2796,9 +2800,7 @@ def metrics_to_check(rows): for row in result: name, next_token = row yield _CassandraExecutionRequest( - BROKEN_METRICS_TO_CHECK, - check_query, - name + BROKEN_METRICS_TO_CHECK, check_query, name ) token = result[-1][1] @@ -2818,13 +2820,11 @@ def metrics_to_remove(result): continue name = response.result_or_exc.response_future.query.values[0] - name = str(name.decode('ascii')) + name = str(name.decode("ascii")) log.info("Scheduling delete for metric '%s'" % name) yield _CassandraExecutionRequest( - BROKEN_METRICS_TO_REMOVE, - delete_query, - name + BROKEN_METRICS_TO_REMOVE, delete_query, name ) rets = self._execute_concurrent_metadata( @@ -2839,7 +2839,9 @@ def metrics_to_remove(result): log.warning(str(ret.result_or_exc)) if callback_on_progress: - callback_on_progress(token - start_token, stop_token - start_token, token) + callback_on_progress( + token - start_token, stop_token - start_token, token + ) def _repair_missing_dir( self, @@ -2871,9 +2873,7 @@ def directories_to_check(result): parent_dir = name.rpartition(".")[0] if parent_dir: yield _CassandraExecutionRequest( - REPAIR_DIRECTORIES_TO_CHECK, - has_directory_query, - parent_dir + REPAIR_DIRECTORIES_TO_CHECK, has_directory_query, parent_dir ) def directories_to_create(result): @@ -2893,8 +2893,7 @@ def directories_to_create(result): for query in queries: PM_REPAIRED_DIRECTORIES.inc() yield _CassandraExecutionRequest( - REPAIR_DIRECTORIES_TO_CREATE, - query + REPAIR_DIRECTORIES_TO_CREATE, query ) log.info("Start creating missing directories") @@ -2903,11 +2902,9 @@ def directories_to_create(result): result = self._execute_metadata( REPAIR_MISSING_DIR, _CassandraExecutionRequest( - REPAIR_MISSING_DIR_COUNT, - dir_query, - int(token) + REPAIR_MISSING_DIR_COUNT, dir_query, int(token) ), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) if len(result.current_rows) == 0: break @@ -2931,7 +2928,9 @@ def directories_to_create(result): log.warning(str(ret.result_or_exc)) if callback_on_progress: - callback_on_progress(token - start_token, stop_token - start_token, token) + callback_on_progress( + token - start_token, stop_token - start_token, token + ) def _clean_empty_dir( self, @@ -2968,7 +2967,7 @@ def directories_to_check(result): yield _CassandraExecutionRequest( CLEAN_DIRECTORIES_TO_CHECK, has_metric_query, - name + DIRECTORY_SEPARATOR + "%" + name + DIRECTORY_SEPARATOR + "%", ) def directories_to_remove(result): @@ -2980,14 +2979,12 @@ def directories_to_remove(result): if results: continue dir_name = response.result_or_exc.response_future.query.values[0] - dir_name = str(dir_name.decode('ascii')) + dir_name = str(dir_name.decode("ascii")) dir_name = str(dir_name).rpartition(".")[0] log.info("Scheduling delete for empty dir '%s'" % dir_name) PM_DELETED_DIRECTORIES.inc() yield _CassandraExecutionRequest( - CLEAN_DIRECTORIES_TO_REMOVE, - delete_empty_dir_stm, - dir_name + CLEAN_DIRECTORIES_TO_REMOVE, delete_empty_dir_stm, dir_name ) # Reset the error count. @@ -3000,12 +2997,8 @@ def directories_to_remove(result): try: result = self._execute_metadata( CLEAN_EMPTY_DIR, - _CassandraExecutionRequest( - CLEAN, - dir_query, - int(token) - ), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + _CassandraExecutionRequest(CLEAN, dir_query, int(token)), + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) # Reset the error count. @@ -3047,7 +3040,9 @@ def directories_to_remove(result): log.warning(str(ret.result_or_exc)) if callback_on_progress: - callback_on_progress(token - start_token, stop_token - start_token, token) + callback_on_progress( + token - start_token, stop_token - start_token, token + ) def clean( self, @@ -3082,15 +3077,15 @@ def clean( first_exception = None # 0.006% of database - num_token_ignore_on_error = 2 ** 50 + num_token_ignore_on_error = 2**50 - env_num_token = environ.get('CLEAN_IGNORE_BATCH_SIZE_ON_ERROR') + env_num_token = environ.get("CLEAN_IGNORE_BATCH_SIZE_ON_ERROR") if env_num_token is not None and env_num_token.isdigit(): exponent = int(env_num_token) if exponent < 16 or exponent > 62: log.exception("Invalid CLEAN_IGNORE_BATCH_SIZE_ON_ERROR given: ignored") else: - num_token_ignore_on_error = 2 ** exponent + num_token_ignore_on_error = 2**exponent # First, clean metrics... if not disable_clean_metrics: @@ -3110,7 +3105,7 @@ def clean( first_exception = e log.exception("Failed to clean metrics.") CLEAN_CURRENT_STEP.set(2) - CLEAN_STEP_TS.labels('expired_metrics').set(time.time() - ts_start) + CLEAN_STEP_TS.labels("expired_metrics").set(time.time() - ts_start) else: log.info("Cleaning metrics was disabled") @@ -3131,7 +3126,7 @@ def clean( first_exception = e log.exception("Failed to clean directories.") CLEAN_CURRENT_STEP.set(4) - CLEAN_STEP_TS.labels('empty_directories').set(time.time() - ts_start) + CLEAN_STEP_TS.labels("empty_directories").set(time.time() - ts_start) else: log.info("Cleaning directories was disabled") @@ -3183,15 +3178,15 @@ def _clean_expired_metrics( log.info("Cleaning with cutoff time %d", cutoff) # select method: traditional, unfiltered, adaptative - available_methods = ['traditional', 'unfiltered', 'adaptative'] - method = 'traditional' + available_methods = ["traditional", "unfiltered", "adaptative"] + method = "traditional" # use env var to select method. - env_method = environ.get('CLEAN_OUTDATED_METHOD') + env_method = environ.get("CLEAN_OUTDATED_METHOD") if env_method is not None and env_method in available_methods: method = env_method - env_batch_size = environ.get('CLEAN_BATCH_SIZE') + env_batch_size = environ.get("CLEAN_BATCH_SIZE") if env_batch_size is not None and env_batch_size.isdigit(): batch_sizes = [int(env_batch_size)] else: @@ -3200,42 +3195,44 @@ def _clean_expired_metrics( current_select_index = 0 # Metrics dump is only compatible with METHOD='unfiltered' - metrics_dump_file = environ.get('CLEAN_DUMP_METRICS', None) + metrics_dump_file = environ.get("CLEAN_DUMP_METRICS", None) metrics_dump_file_fd = None if metrics_dump_file is not None: - if env_method != 'unfiltered': - log.warning('Can not dump all metrics when not using "unfiltered" method') + if env_method != "unfiltered": + log.warning( + 'Can not dump all metrics when not using "unfiltered" method' + ) else: - log.info(f'Opening {metrics_dump_file} to dump all metrics.') + log.info(f"Opening {metrics_dump_file} to dump all metrics.") # Using gzip for small files. - metrics_dump_file_fd = gzip.open(metrics_dump_file, 'wt') + metrics_dump_file_fd = gzip.open(metrics_dump_file, "wt") # statements for batch_size in batch_sizes: - if method in ['traditional', 'adaptative']: + if method in ["traditional", "adaptative"]: # Enforcing consistency one on this one, as it will eventually fail if not. select = _CassandraExecutionRequest( CLEAN_EXPIRED_METRICS_SELECT, self._prepare_background_request( - 'SELECT name, token(name), toUnixTimestamp(updated_on) ' + "SELECT name, token(name), toUnixTimestamp(updated_on) " 'FROM "%s".metrics_metadata ' - 'WHERE updated_on <= maxTimeuuid(%d) and token(name) > ? LIMIT %d;' + "WHERE updated_on <= maxTimeuuid(%d) and token(name) > ? LIMIT %d;" % (self.keyspace_metadata, cutoff, batch_size), consistency=cassandra.ConsistencyLevel.ONE, timeout=None, - ) + ), ) else: select = _CassandraExecutionRequest( CLEAN_EXPIRED_METRICS_SELECT, self._prepare_background_request( - 'SELECT name, token(name), toUnixTimestamp(updated_on) ' + "SELECT name, token(name), toUnixTimestamp(updated_on) " 'FROM "%s".metrics_metadata ' - 'WHERE token(name) > ? LIMIT %d;' + "WHERE token(name) > ? LIMIT %d;" % (self.keyspace_metadata, DEFAULT_MAX_BATCH_UTIL), timeout=None, - ) + ), ) select_queries.append(select) @@ -3244,22 +3241,25 @@ def _clean_expired_metrics( CLEAN_EXPIRED_METRICS_DELETE, self._prepare_background_request( 'DELETE FROM "%s".metrics WHERE name = ? ;' % (self.keyspace_metadata) - ) + ), ) delete_metadata = _CassandraExecutionRequest( CLEAN_EXPIRED_METRICS_DELETE_METADATA, self._prepare_background_request( 'DELETE FROM "%s".metrics_metadata WHERE name = ? ;' % (self.keyspace_metadata) - ) + ), ) def run(method, cutoff, rows, metrics_dump_file_fd=None): for name, _, updated_on in rows: - if method == 'unfiltered': + if method == "unfiltered": to_delete = True if updated_on is None: - log.info("Skipping metric %s as no updated_on date was present.", name) + log.info( + "Skipping metric %s as no updated_on date was present.", + name, + ) to_delete = False if to_delete is True and updated_on > cutoff: @@ -3287,7 +3287,7 @@ def run(method, cutoff, rows, metrics_dump_file_fd=None): rows = self._execute_metadata( CLEAN_EXPIRED_METRICS, select_queries[current_select_index].with_params(int(token)), - timeout=DEFAULT_TIMEOUT_QUERY_UTIL + timeout=DEFAULT_TIMEOUT_QUERY_UTIL, ) # Reset the error count. @@ -3305,7 +3305,10 @@ def run(method, cutoff, rows, metrics_dump_file_fd=None): # After a few retries on the same query, lets move on. if ignored_errors % 3 == 0: - if method == 'adaptative' and current_select_index != len(select_queries) - 1: + if ( + method == "adaptative" + and current_select_index != len(select_queries) - 1 + ): # Too much errors with this batch size: limiting number of results. current_select_index += 1 else: @@ -3342,7 +3345,7 @@ def run(method, cutoff, rows, metrics_dump_file_fd=None): # End of the process if metrics_dump_file_fd is not None: - log.info(f'Closing {metrics_dump_file} and process is ending.') + log.info(f"Closing {metrics_dump_file} and process is ending.") metrics_dump_file_fd.close() def metadata_enabled(self): diff --git a/requirements.txt b/requirements.txt index 0c0b84a254..92d51dcf95 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ sortedcontainers # Cassandra driver cassandra-driver==3.24.0 scales +pyyaml<6.0 # ElasticSearch elasticsearch<7.0.0,>=6.0.0 diff --git a/setup.py b/setup.py index 1442fe06ea..4607044cff 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ def _read_reqs(relpath): setuptools.setup( name="biggraphite", - version="0.14.28", + version="0.14.29", maintainer="Criteo Graphite Team", maintainer_email="github@criteo.com", description="Simple Scalable Time Series Database.", diff --git a/tests-requirements.txt b/tests-requirements.txt index 9d1b411375..85f5aef9c9 100644 --- a/tests-requirements.txt +++ b/tests-requirements.txt @@ -13,6 +13,7 @@ pylama # Cassandra testing.cassandra3 +pyyaml<6.0 # Elasticsearch testing.elasticsearch6