Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
37a992d
Initial plan
Copilot Dec 27, 2025
522d55f
Add cryptography>=42.0.4 to dev dependencies for column encryption tests
Copilot Dec 29, 2025
76733f2
Apply suggested fix to docs/api/cassandra/protocol.rst from Copilot A…
mykaul Dec 16, 2025
f1deca6
Apply suggested fix to docs/api/cassandra/protocol.rst from Copilot A…
mykaul Dec 16, 2025
09cc201
chore(deps): update dependency hatchling to v1.28.0
renovate[bot] Dec 29, 2025
c114180
Fix for Missing call to superclass `__init__` during object initializ…
mykaul Dec 16, 2025
66b81c3
Apply Python style improvements to test assertions
Copilot Dec 23, 2025
7814555
Potential fix for code scanning alert no. 3: Workflow does not contai…
mykaul Dec 22, 2025
e79b4c6
.github/workflows/publish-manually.yml: Potential fix for code scanni…
mykaul Dec 22, 2025
af4d83c
Don't mark node down when control connection fails to connect
dkropachev Dec 30, 2025
f11f55f
(improvement) remove supprot for protocols <3 from cython files
mykaul Jan 4, 2026
0ebd9f5
Pull version information from systel.local, when version info is not
dkropachev Jan 1, 2026
d08d0e2
Fix infinite retry when single host fails with server error
Copilot Dec 27, 2025
e61d265
Use endpoint instead od Host in _try_connect
sylwiaszunejko Dec 29, 2025
296a981
tests/integration/standard: fix test to reflect RR policy randomizing…
sylwiaszunejko Dec 29, 2025
2b7dd50
tests/integration/standard: update test to reflect new behavior
sylwiaszunejko Dec 29, 2025
796b0fc
tests/integration/standard: don't compare Host instances
sylwiaszunejko Dec 29, 2025
1b24880
tests/unit: Provide host_id when initializing Host
sylwiaszunejko Dec 22, 2025
d6459b9
tests/integration/standard: return empty query plan if there are no l…
sylwiaszunejko Dec 30, 2025
7e4bd1f
tests/integration/standard: allow execute to throw Unavailable exception
sylwiaszunejko Jan 12, 2026
2034f95
Don't check if host is in initial contact points when setting default…
sylwiaszunejko Jan 8, 2026
5f7f413
Call on_add before distance to properly initialize lbp
sylwiaszunejko Jan 8, 2026
921f324
Don't create Host instances with random host_id
sylwiaszunejko Dec 29, 2025
f2d9022
(improvement)TokenAwarePolicy::make_query_plan(): remove redundant ch…
mykaul Jan 21, 2026
a00ffa7
test: optimize test_fast_shutdown with event-based synchronization
mykaul Jan 18, 2026
9f27bcf
(Fix)race condition during host IP address update
mykaul Jan 23, 2026
82f99aa
add uv files to .gitignore
dkropachev Jan 29, 2026
1886f8e
Optimize write path in protocol.py to reduce copies
mykaul Jan 9, 2026
a613366
Initial plan
Copilot Dec 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/docs-pages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ name: "Docs / Publish"
# For more information,
# see https://sphinx-theme.scylladb.com/stable/deployment/production.html#available-workflows

permissions:
contents: write

on:
push:
branches:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/publish-manually.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: Build and upload to PyPi manually

permissions:
contents: read

on:
workflow_dispatch:
inputs:
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ tests/unit/cython/bytesio_testhelper.c
#iPython
*.ipynb

uv.lock
.venv/



# Files from upstream that we don't need
Jenkinsfile
Jenkinsfile.bak
Expand Down
152 changes: 39 additions & 113 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,8 @@ def default_retry_policy(self, policy):
Using ssl_options without ssl_context is deprecated and will be removed in the
next major release.

An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket``
when new sockets are created. This should be used when client encryption is enabled
An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket``
when new sockets are created. This should be used when client encryption is enabled
in Cassandra.

The following documentation only applies when ssl_options is used without ssl_context.
Expand Down Expand Up @@ -1086,10 +1086,10 @@ def default_retry_policy(self, policy):
"""
Specifies a server-side timeout (in seconds) for all internal driver queries,
such as schema metadata lookups and cluster topology requests.

The timeout is enforced by appending `USING TIMEOUT <timeout>` to queries
executed by the driver.

- A value of `0` disables explicit timeout enforcement. In this case,
the driver does not add `USING TIMEOUT`, and the timeout is determined
by the server's defaults.
Expand Down Expand Up @@ -1683,14 +1683,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
self.protocol_version = new_version

def _add_resolved_hosts(self):
for endpoint in self.endpoints_resolved:
host, new = self.add_host(endpoint, signal=False)
if new:
host.set_up()
for listener in self.listeners:
listener.on_add(host)

def _populate_hosts(self):
self.profile_manager.populate(
weakref.proxy(self), self.metadata.all_hosts())
self.load_balancing_policy.populate(
Expand All @@ -1717,17 +1710,10 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
self.contact_points, self.protocol_version)
self.connection_class.initialize_reactor()
_register_cluster_shutdown(self)

self._add_resolved_hosts()

try:
self.control_connection.connect()

# we set all contact points up for connecting, but we won't infer state after this
for endpoint in self.endpoints_resolved:
h = self.metadata.get_host(endpoint)
if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
h.is_up = None
self._populate_hosts()

log.debug("Control connection created")
except Exception:
Expand Down Expand Up @@ -2016,14 +2002,14 @@ def on_add(self, host, refresh_nodes=True):

log.debug("Handling new host %r and notifying listeners", host)

self.profile_manager.on_add(host)
self.control_connection.on_add(host, refresh_nodes)

distance = self.profile_manager.distance(host)
if distance != HostDistance.IGNORED:
self._prepare_all_queries(host)
log.debug("Done preparing queries for new host %r", host)

self.profile_manager.on_add(host)
self.control_connection.on_add(host, refresh_nodes)

if distance == HostDistance.IGNORED:
log.debug("Not adding connection pool for new host %r because the "
"load balancing policy has marked it as IGNORED", host)
Expand Down Expand Up @@ -3534,28 +3520,22 @@ def _set_new_connection(self, conn):
if old:
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
old.close()
def _connect_host_in_lbp(self):

def _try_connect_to_hosts(self):
errors = {}
lbp = (
self._cluster.load_balancing_policy
if self._cluster._config_mode == _ConfigMode.LEGACY else
self._cluster._default_load_balancing_policy
)

for host in lbp.make_query_plan():
lbp = self._cluster.load_balancing_policy \
if self._cluster._config_mode == _ConfigMode.LEGACY else self._cluster._default_load_balancing_policy

for endpoint in chain((host.endpoint for host in lbp.make_query_plan()), self._cluster.endpoints_resolved):
try:
return (self._try_connect(host), None)
except ConnectionException as exc:
errors[str(host.endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
self._cluster.signal_connection_failure(host, exc, is_host_addition=False)
return (self._try_connect(endpoint), None)
except Exception as exc:
errors[str(host.endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
errors[str(endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
if self._is_shutdown:
raise DriverException("[control connection] Reconnection in progress during shutdown")

return (None, errors)

def _reconnect_internal(self):
Expand All @@ -3567,43 +3547,43 @@ def _reconnect_internal(self):
to the exception that was raised when an attempt was made to open
a connection to that host.
"""
(conn, _) = self._connect_host_in_lbp()
(conn, _) = self._try_connect_to_hosts()
if conn is not None:
return conn

# Try to re-resolve hostnames as a fallback when all hosts are unreachable
self._cluster._resolve_hostnames()

self._cluster._add_resolved_hosts()
self._cluster._populate_hosts()

(conn, errors) = self._connect_host_in_lbp()
(conn, errors) = self._try_connect_to_hosts()
if conn is not None:
return conn

raise NoHostAvailable("Unable to connect to any servers", errors)

def _try_connect(self, host):
def _try_connect(self, endpoint):
"""
Creates a new Connection, registers for pushed events, and refreshes
node/token and schema metadata.
"""
log.debug("[control connection] Opening new connection to %s", host)
log.debug("[control connection] Opening new connection to %s", endpoint)

while True:
try:
connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
connection = self._cluster.connection_factory(endpoint, is_control_connection=True)
if self._is_shutdown:
connection.close()
raise DriverException("Reconnecting during shutdown")
break
except ProtocolVersionUnsupported as e:
self._cluster.protocol_downgrade(host.endpoint, e.startup_version)
self._cluster.protocol_downgrade(endpoint, e.startup_version)
except ProtocolException as e:
# protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver
# protocol version. If the protocol version was not explicitly specified,
# and that the server raises a beta protocol error, we should downgrade.
if not self._cluster._protocol_version_explicit and e.is_beta_protocol_error:
self._cluster.protocol_downgrade(host.endpoint, self._cluster.protocol_version)
self._cluster.protocol_downgrade(endpoint, self._cluster.protocol_version)
else:
raise

Expand Down Expand Up @@ -3818,67 +3798,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
self._cluster.metadata.cluster_name = cluster_name

partitioner = local_row.get("partitioner")
tokens = local_row.get("tokens")

host = self._cluster.metadata.get_host(connection.original_endpoint)
if host:
datacenter = local_row.get("data_center")
rack = local_row.get("rack")
self._update_location_info(host, datacenter, rack)

# support the use case of connecting only with public address
if isinstance(self._cluster.endpoint_factory, SniEndPointFactory):
new_endpoint = self._cluster.endpoint_factory.create(local_row)

if new_endpoint.address:
host.endpoint = new_endpoint

host.host_id = local_row.get("host_id")

found_host_ids.add(host.host_id)
found_endpoints.add(host.endpoint)

host.listen_address = local_row.get("listen_address")
host.listen_port = local_row.get("listen_port")
host.broadcast_address = _NodeInfo.get_broadcast_address(local_row)
host.broadcast_port = _NodeInfo.get_broadcast_port(local_row)

host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(local_row)
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(local_row)
if host.broadcast_rpc_address is None:
if self._token_meta_enabled:
# local rpc_address is not available, use the connection endpoint
host.broadcast_rpc_address = connection.endpoint.address
host.broadcast_rpc_port = connection.endpoint.port
else:
# local rpc_address has not been queried yet, try to fetch it
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
# in system.local. See CASSANDRA-9436.
local_rpc_address_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
success, local_rpc_address_result = connection.wait_for_response(
local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
if success:
row = dict_factory(
local_rpc_address_result.column_names,
local_rpc_address_result.parsed_rows)
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(row[0])
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(row[0])
else:
host.broadcast_rpc_address = connection.endpoint.address
host.broadcast_rpc_port = connection.endpoint.port

host.release_version = local_row.get("release_version")
host.dse_version = local_row.get("dse_version")
host.dse_workload = local_row.get("workload")
host.dse_workloads = local_row.get("workloads")
tokens = local_row.get("tokens", None)

if partitioner and tokens:
token_map[host] = tokens
peers_result.insert(0, local_row)

self._cluster.metadata.update_host(host, old_endpoint=connection.endpoint)
connection.original_endpoint = connection.endpoint = host.endpoint
# Check metadata.partitioner to see if we haven't built anything yet. If
# every node in the cluster was in the contact points, we won't discover
# any new nodes, so we need this additional check. (See PYTHON-90)
Expand Down Expand Up @@ -3908,14 +3831,16 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
host = self._cluster.metadata.get_host_by_host_id(host_id)
if host and host.endpoint != endpoint:
log.debug("[control connection] Updating host ip from %s to %s for (%s)", host.endpoint, endpoint, host_id)
old_endpoint = host.endpoint
host.endpoint = endpoint
self._cluster.metadata.update_host(host, old_endpoint)
reconnector = host.get_and_set_reconnection_handler(None)
if reconnector:
reconnector.cancel()
self._cluster.on_down(host, is_host_addition=False, expect_host_to_be_down=True)

old_endpoint = host.endpoint
host.endpoint = endpoint
self._cluster.metadata.update_host(host, old_endpoint)
self._cluster.on_up(host)

if host is None:
log.debug("[control connection] Found new host to connect to: %s", endpoint)
host, _ = self._cluster.add_host(endpoint, datacenter=datacenter, rack=rack, signal=True, refresh_nodes=False, host_id=host_id)
Expand Down Expand Up @@ -4177,8 +4102,9 @@ def _get_peers_query(self, peers_query_type, connection=None):
query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE
if peers_query_type == self.PeersQueryType.PEERS_SCHEMA
else self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
host_release_version = self._cluster.metadata.get_host(connection.original_endpoint).release_version
host_dse_version = self._cluster.metadata.get_host(connection.original_endpoint).dse_version
original_endpoint_host = self._cluster.metadata.get_host(connection.original_endpoint)
host_release_version = None if original_endpoint_host is None else original_endpoint_host.release_version
host_dse_version = None if original_endpoint_host is None else original_endpoint_host.dse_version
uses_native_address_query = (
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)

Expand Down Expand Up @@ -4547,7 +4473,7 @@ def _make_query_plan(self):
# or to the explicit host target if set
if self._host:
# returning a single value effectively disables retries
self.query_plan = [self._host]
self.query_plan = iter([self._host])
else:
# convert the list/generator/etc to an iterator so that subsequent
# calls to send_request (which retries may do) will resume where
Expand Down
Loading