Skip to content

Commit 8952bc7

Browse files
committed
Add cross-node sync via central server
Signed-off-by: Skye <[email protected]>
1 parent 3facc05 commit 8952bc7

File tree

10 files changed

+479
-21
lines changed

10 files changed

+479
-21
lines changed

central_server/chat/signals.py

Lines changed: 332 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
1+
import threading
2+
import logging
3+
import requests
4+
from django.conf import settings
5+
from django.utils import timezone
6+
from django.db.models.signals import post_save, post_delete
17
from django.dispatch import receiver
2-
from django.db.models.signals import post_save
38
from django.contrib.auth import get_user_model
49
from users.models import UserProfile
10+
from django.db import transaction
11+
from nodes.models import Node
12+
13+
logger = logging.getLogger(__name__)
14+
15+
# Thread-local storage for tracking sync origins
16+
_sync_origin_local = threading.local()
517

618

719
@receiver(post_save, sender=get_user_model())
@@ -16,3 +28,322 @@ def save_user_profile(sender, instance, **kwargs):
1628
"""Automatically save user profile when user is saved"""
1729
if hasattr(instance, "profile"):
1830
instance.profile.save()
31+
32+
33+
class CentralSyncSignalHandler:
34+
_instance = None
35+
_initialized = False
36+
37+
def __new__(cls):
38+
if cls._instance is None:
39+
cls._instance = super().__new__(cls)
40+
return cls._instance
41+
42+
def __init__(self):
43+
if not self._initialized:
44+
self.is_central_server = getattr(settings, "IS_CENTRAL_SERVER", True)
45+
self._initialized = True
46+
47+
def set_sync_origin(self, node_id):
48+
"""Set the originating node for the current sync operation"""
49+
_sync_origin_local.node_id = node_id
50+
51+
def get_sync_origin(self):
52+
"""Get the originating node for the current sync operation"""
53+
return getattr(_sync_origin_local, "node_id", None)
54+
55+
def clear_sync_origin(self):
56+
"""Clear the sync origin after operation"""
57+
if hasattr(_sync_origin_local, "node_id"):
58+
delattr(_sync_origin_local, "node_id")
59+
60+
def should_sync_to_node(self, target_node_id, originating_node_id):
61+
"""Check if we should sync to a specific node (avoid loops)"""
62+
return target_node_id != originating_node_id
63+
64+
def get_active_nodes(self, exclude_node_id=None):
65+
"""Get all active nodes excluding the specified one"""
66+
try:
67+
nodes = Node.objects.filter(status="online")
68+
if exclude_node_id:
69+
nodes = nodes.exclude(id=exclude_node_id)
70+
return nodes
71+
except Exception as e:
72+
logger.error(f"Error fetching active nodes: {e}")
73+
return []
74+
75+
def send_sync_to_nodes(self, model_name, instance, action, originating_node_id):
76+
"""Send sync updates to all other nodes except the originating one"""
77+
if not self.is_central_server:
78+
return
79+
80+
# Use thread pool for async execution
81+
thread = threading.Thread(
82+
target=self._send_sync_to_nodes_async,
83+
args=(model_name, instance, action, originating_node_id),
84+
daemon=True,
85+
)
86+
thread.start()
87+
88+
def _send_sync_to_nodes_async(
89+
self, model_name, instance, action, originating_node_id
90+
):
91+
"""Async implementation of node sync"""
92+
try:
93+
active_nodes = self.get_active_nodes(exclude_node_id=originating_node_id)
94+
95+
if not active_nodes:
96+
logger.debug("No active nodes to sync with")
97+
return
98+
99+
sync_data = {
100+
"model": model_name,
101+
"action": action,
102+
"data": self.serialize_instance(instance),
103+
"timestamp": timezone.now().isoformat(),
104+
# Track where it came from
105+
"origin_node_id": str(originating_node_id),
106+
"central_sync": True, # Mark as coming from central
107+
}
108+
109+
successful_syncs = 0
110+
total_nodes = len(active_nodes)
111+
112+
for node in active_nodes:
113+
if not self.should_sync_to_node(node.id, originating_node_id):
114+
continue
115+
116+
try:
117+
response = requests.post(
118+
# Node's sync endpoint
119+
f"{node.url}/api/sync/receive/",
120+
json=sync_data,
121+
headers={
122+
"X-Central-API-Key": getattr(
123+
settings, "CENTRAL_API_KEY", ""
124+
),
125+
"Content-Type": "application/json",
126+
},
127+
timeout=5, # Shorter timeout for nodes
128+
)
129+
130+
if response.status_code == 200:
131+
successful_syncs += 1
132+
logger.debug(f"Sync successful to node {node.name}")
133+
else:
134+
logger.warning(
135+
f"Sync failed to node {node.name}: {response.status_code}"
136+
)
137+
138+
except requests.exceptions.Timeout:
139+
logger.warning(f"Sync timeout to node {node.name}")
140+
except requests.exceptions.ConnectionError:
141+
logger.warning(f"Connection error to node {node.name}")
142+
except Exception as e:
143+
logger.error(f"Sync error to node {node.name}: {e}")
144+
145+
logger.info(f"Sync completed: {successful_syncs}/{total_nodes} nodes")
146+
147+
except Exception as e:
148+
logger.error(f"Error in node sync process: {e}")
149+
150+
def serialize_instance(self, instance):
151+
"""Serialize model instance for sync (same as node version)"""
152+
if hasattr(instance, "to_sync_dict"):
153+
return instance.to_sync_dict()
154+
155+
data = {"id": str(instance.id)}
156+
model_class = type(instance)
157+
158+
field_mappings = {
159+
"Message": self._serialize_message,
160+
"ChatRoom": self._serialize_chatroom,
161+
"RoomMembership": self._serialize_room_membership,
162+
"MessageReadStatus": self._serialize_message_read_status,
163+
"UserSession": self._serialize_user_session,
164+
"CustomUser": self._serialize_custom_user,
165+
}
166+
167+
serializer = field_mappings.get(model_class.__name__)
168+
if serializer:
169+
data.update(serializer(instance))
170+
171+
return data
172+
173+
def _serialize_message(self, instance):
174+
return {
175+
"id": str(instance.id),
176+
"room_id": str(instance.room_id),
177+
"sender_id": str(instance.sender_id),
178+
"content": instance.content,
179+
"message_type": instance.message_type,
180+
"created_at": instance.created_at.isoformat(),
181+
"updated_at": instance.updated_at.isoformat(),
182+
"is_edited": instance.is_edited,
183+
"is_deleted": instance.is_deleted,
184+
}
185+
186+
def _serialize_chatroom(self, instance):
187+
return {
188+
"id": str(instance.id),
189+
"name": instance.name,
190+
"description": instance.description or "",
191+
"room_type": instance.room_type,
192+
"created_by_id": str(instance.created_by_id),
193+
"is_active": instance.is_active,
194+
"max_members": instance.max_members,
195+
"created_at": instance.created_at.isoformat(),
196+
}
197+
198+
def _serialize_room_membership(self, instance):
199+
return {
200+
"id": str(instance.id),
201+
"room_id": str(instance.room_id),
202+
"user_id": str(instance.user_id),
203+
"role": instance.role,
204+
"joined_at": instance.joined_at.isoformat(),
205+
}
206+
207+
def _serialize_message_read_status(self, instance):
208+
return {
209+
"message_id": str(instance.message_id),
210+
"user_id": str(instance.user_id),
211+
"read_at": instance.read_at.isoformat(),
212+
}
213+
214+
def _serialize_user_session(self, instance):
215+
return {
216+
"user_id": str(instance.user.id),
217+
"session_key": instance.session_key,
218+
"ip_address": instance.ip_address or "",
219+
"user_agent": instance.user_agent or "",
220+
"last_activity": instance.last_activity.isoformat(),
221+
}
222+
223+
def _serialize_custom_user(self, instance):
224+
return {
225+
"user_id": str(instance.id),
226+
"username": instance.username,
227+
"email": instance.email,
228+
"is_online": instance.is_online,
229+
"last_seen": instance.last_seen.isoformat() if instance.last_seen else None,
230+
"avatar": str(instance.avatar) if instance.avatar else None,
231+
"bio": instance.bio or "",
232+
"notification_enabled": instance.notification_enabled,
233+
"sound_enabled": instance.sound_enabled,
234+
}
235+
236+
237+
# Global central signal handler instance
238+
_central_sync_handler = None
239+
240+
241+
def get_central_sync_handler():
242+
global _central_sync_handler
243+
if _central_sync_handler is None:
244+
_central_sync_handler = CentralSyncSignalHandler()
245+
return _central_sync_handler
246+
247+
248+
def safe_central_sync(instance, model_name, action, originating_node_id):
249+
"""
250+
Safe wrapper for central sync that prevents loops
251+
"""
252+
try:
253+
handler = get_central_sync_handler()
254+
if handler.is_central_server and originating_node_id:
255+
handler.send_sync_to_nodes(
256+
model_name, instance, action, originating_node_id
257+
)
258+
except Exception as e:
259+
logger.error(f"Central sync error for {model_name} {action}: {e}")
260+
261+
262+
# Central server signal receivers
263+
@receiver(post_save, sender="chat.Message")
264+
def central_message_saved(sender, instance, created, **kwargs):
265+
"""Central: Sync message to other nodes"""
266+
# Check if this save came from a sync operation
267+
sync_origin = get_central_sync_handler().get_sync_origin()
268+
if sync_origin:
269+
# This save came from a node sync, so propagate to other nodes
270+
safe_central_sync(
271+
instance, "message", "create" if created else "update", sync_origin
272+
)
273+
274+
275+
@receiver(post_save, sender="chat.ChatRoom")
276+
def central_chatroom_saved(sender, instance, created, **kwargs):
277+
"""Central: Sync chatroom to other nodes"""
278+
sync_origin = get_central_sync_handler().get_sync_origin()
279+
if sync_origin:
280+
safe_central_sync(
281+
instance, "chatroom", "create" if created else "update", sync_origin
282+
)
283+
284+
285+
@receiver(post_save, sender="chat.RoomMembership")
286+
def central_room_membership_saved(sender, instance, created, **kwargs):
287+
"""Central: Sync membership to other nodes"""
288+
sync_origin = get_central_sync_handler().get_sync_origin()
289+
if sync_origin:
290+
safe_central_sync(
291+
instance, "roommembership", "create" if created else "update", sync_origin
292+
)
293+
294+
295+
@receiver(post_save, sender="users.CustomUser")
296+
def central_user_saved(sender, instance, created, **kwargs):
297+
"""Central: Sync user to other nodes"""
298+
sync_origin = get_central_sync_handler().get_sync_origin()
299+
if sync_origin:
300+
safe_central_sync(
301+
instance, "user", "create" if created else "update", sync_origin
302+
)
303+
304+
305+
@receiver(post_save, sender="chat.MessageReadStatus")
306+
def central_message_status_saved(sender, instance, created, **kwargs):
307+
"""Central: Sync read status to other nodes"""
308+
sync_origin = get_central_sync_handler().get_sync_origin()
309+
if sync_origin:
310+
safe_central_sync(
311+
instance,
312+
"messagereadstatus",
313+
"create" if created else "update",
314+
sync_origin,
315+
)
316+
317+
318+
# Delete handlers
319+
@receiver(post_delete, sender="chat.Message")
320+
def central_message_deleted(sender, instance, **kwargs):
321+
"""Central: Sync message deletion to other nodes"""
322+
sync_origin = get_central_sync_handler().get_sync_origin()
323+
if sync_origin:
324+
safe_central_sync(instance, "message", "delete", sync_origin)
325+
326+
327+
@receiver(post_delete, sender="chat.ChatRoom")
328+
def central_chatroom_deleted(sender, instance, **kwargs):
329+
"""Central: Sync chatroom deletion to other nodes"""
330+
sync_origin = get_central_sync_handler().get_sync_origin()
331+
if sync_origin:
332+
safe_central_sync(instance, "chatroom", "delete", sync_origin)
333+
334+
335+
@receiver(post_delete, sender="chat.RoomMembership")
336+
def central_room_membership_deleted(sender, instance, **kwargs):
337+
"""Central: Sync membership deletion to other nodes"""
338+
sync_origin = get_central_sync_handler().get_sync_origin()
339+
if sync_origin:
340+
safe_central_sync(instance, "roommembership", "delete", sync_origin)
341+
342+
343+
@receiver(post_delete, sender="users.CustomUser")
344+
def central_user_deleted(sender, instance, **kwargs):
345+
"""Central: Sync user deletion to other nodes"""
346+
sync_origin = get_central_sync_handler().get_sync_origin()
347+
if sync_origin:
348+
safe_central_sync(instance, "user", "delete", sync_origin)
349+
safe_central_sync(instance, "user", "delete", sync_origin)

central_server/nodes/middleware.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import requests
22
from django.conf import settings
3-
from django.utils import timezone
43
import logging
54
from django.http import JsonResponse
65

central_server/nodes/models.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ def update_load(self):
8282
self.load = 0
8383
self.save()
8484

85+
@property
86+
def health_score(self):
87+
"""Get cached health score"""
88+
from django.core.cache import cache
89+
90+
cache_key = f"node_health_{self.id}"
91+
health_data = cache.get(cache_key, {})
92+
return health_data.get("health_score", 100)
93+
94+
def get_system_metrics(self):
95+
"""Get cached system metrics"""
96+
from django.core.cache import cache
97+
98+
cache_key = f"node_metrics_{self.id}"
99+
return cache.get(cache_key, {})
100+
85101
def to_dict(self):
86102
return {
87103
"url": self.url,
@@ -92,8 +108,8 @@ def to_dict(self):
92108
"max_rooms": self.max_rooms,
93109
"load": self.load,
94110
"status": self.status,
95-
"last_heartbeat": self.last_heartbeat,
96-
"last_sync": self.last_sync,
111+
"last_heartbeat": str(self.last_heartbeat) if self.last_heartbeat else None,
112+
"last_sync": str(self.last_sync) if self.last_sync else None,
97113
}
98114

99115

0 commit comments

Comments
 (0)