diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index a7441ec39..97fddcf06 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -16,6 +16,7 @@ from memos.log import get_logger from memos.mem_cube.base import BaseMemCube from memos.mem_cube.general import GeneralMemCube +from memos.mem_scheduler.general_modules.init_components_for_scheduler import init_components from memos.mem_scheduler.general_modules.misc import AutoDroppingQueue as Queue from memos.mem_scheduler.general_modules.scheduler_logger import SchedulerLoggerModule from memos.mem_scheduler.memory_manage_modules.retriever import SchedulerRetriever @@ -154,7 +155,9 @@ def __init__(self, config: BaseSchedulerConfig): self._context_lock = threading.Lock() self.current_user_id: UserID | str | None = None self.current_mem_cube_id: MemCubeID | str | None = None - self.current_mem_cube: BaseMemCube | None = None + self.components = init_components() + self.current_mem_cube: BaseMemCube | None = self.components["naive_mem_cube"] + self._mem_cubes: dict[str, BaseMemCube] = {} self.auth_config_path: str | Path | None = self.config.get("auth_config_path", None) self.auth_config = None self.rabbitmq_config = None @@ -256,6 +259,43 @@ def mem_cube(self, value: BaseMemCube) -> None: self.current_mem_cube = value self.retriever.mem_cube = value + @property + def mem_cubes(self) -> dict[str, BaseMemCube]: + """All available memory cubes registered to the scheduler. + + Setting this property will also initialize `current_mem_cube` if it is not + already set, following the initialization pattern used in component_init.py + (i.e., calling `init_mem_cube(...)`), without introducing circular imports. + """ + return self._mem_cubes + + @mem_cubes.setter + def mem_cubes(self, value: dict[str, BaseMemCube]) -> None: + self._mem_cubes = value or {} + + # Initialize current_mem_cube if not set yet and mem_cubes are available + try: + if self.current_mem_cube is None and self._mem_cubes: + selected_cube: BaseMemCube | None = None + + # Prefer the cube matching current_mem_cube_id if provided + if self.current_mem_cube_id and self.current_mem_cube_id in self._mem_cubes: + selected_cube = self._mem_cubes[self.current_mem_cube_id] + else: + # Fall back to the first available cube deterministically + first_id, first_cube = next(iter(self._mem_cubes.items())) + self.current_mem_cube_id = first_id + selected_cube = first_cube + + if selected_cube is not None: + # Use init_mem_cube to mirror component_init.py behavior + # This sets self.mem_cube (and retriever.mem_cube), text_mem, and searcher. + self.init_mem_cube(mem_cube=selected_cube) + except Exception as e: + logger.warning( + f"Failed to initialize current_mem_cube from mem_cubes: {e}", exc_info=True + ) + def transform_working_memories_to_monitors( self, query_keywords, memories: list[TextualMemoryItem] ) -> list[MemoryMonitorItem]: diff --git a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py new file mode 100644 index 000000000..6addb052a --- /dev/null +++ b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py @@ -0,0 +1,391 @@ +import json +import os + +from typing import Any + +from memos.api.config import APIConfig +from memos.configs.embedder import EmbedderConfigFactory +from memos.configs.graph_db import GraphDBConfigFactory +from memos.configs.internet_retriever import InternetRetrieverConfigFactory +from memos.configs.llm import LLMConfigFactory +from memos.configs.mem_reader import MemReaderConfigFactory +from memos.configs.reranker import RerankerConfigFactory +from memos.configs.vec_db import VectorDBConfigFactory +from memos.embedders.factory import EmbedderFactory +from memos.graph_dbs.factory import GraphStoreFactory +from memos.llms.factory import LLMFactory +from memos.log import get_logger +from memos.mem_cube.navie import NaiveMemCube +from memos.mem_reader.factory import MemReaderFactory +from memos.memories.textual.prefer_text_memory.config import ( + AdderConfigFactory, + ExtractorConfigFactory, + RetrieverConfigFactory, +) +from memos.memories.textual.prefer_text_memory.factory import ( + AdderFactory, + ExtractorFactory, + RetrieverFactory, +) +from memos.memories.textual.simple_preference import SimplePreferenceTextMemory +from memos.memories.textual.simple_tree import SimpleTreeTextMemory +from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager +from memos.memories.textual.tree_text_memory.retrieve.internet_retriever_factory import ( + InternetRetrieverFactory, +) +from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import FastTokenizer +from memos.reranker.factory import RerankerFactory +from memos.vec_dbs.factory import VecDBFactory + + +logger = get_logger(__name__) + + +def build_graph_db_config(user_id: str = "default") -> dict[str, Any]: + """ + Build graph database configuration. + + Args: + user_id: User ID for configuration context (default: "default") + + Returns: + Validated graph database configuration dictionary + """ + graph_db_backend_map = { + "neo4j-community": APIConfig.get_neo4j_community_config(user_id=user_id), + "neo4j": APIConfig.get_neo4j_config(user_id=user_id), + "nebular": APIConfig.get_nebular_config(user_id=user_id), + "polardb": APIConfig.get_polardb_config(user_id=user_id), + } + + graph_db_backend = os.getenv("NEO4J_BACKEND", "nebular").lower() + return GraphDBConfigFactory.model_validate( + { + "backend": graph_db_backend, + "config": graph_db_backend_map[graph_db_backend], + } + ) + + +def build_vec_db_config() -> dict[str, Any]: + """ + Build vector database configuration. + + Returns: + Validated vector database configuration dictionary + """ + return VectorDBConfigFactory.model_validate( + { + "backend": "milvus", + "config": APIConfig.get_milvus_config(), + } + ) + + +def build_llm_config() -> dict[str, Any]: + """ + Build LLM configuration. + + Returns: + Validated LLM configuration dictionary + """ + return LLMConfigFactory.model_validate( + { + "backend": "openai", + "config": APIConfig.get_openai_config(), + } + ) + + +def build_chat_llm_config() -> list[dict[str, Any]]: + """ + Build chat LLM configuration. + + Returns: + Validated chat LLM configuration dictionary + """ + configs = json.loads(os.getenv("CHAT_MODEL_LIST")) + return [ + { + "config_class": LLMConfigFactory.model_validate( + { + "backend": cfg.get("backend", "openai"), + "config": ( + {k: v for k, v in cfg.items() if k not in ["backend", "support_models"]} + ) + if cfg + else APIConfig.get_openai_config(), + } + ), + "support_models": cfg.get("support_models", None), + } + for cfg in configs + ] + + +def build_embedder_config() -> dict[str, Any]: + """ + Build embedder configuration. + + Returns: + Validated embedder configuration dictionary + """ + return EmbedderConfigFactory.model_validate(APIConfig.get_embedder_config()) + + +def build_mem_reader_config() -> dict[str, Any]: + """ + Build memory reader configuration. + + Returns: + Validated memory reader configuration dictionary + """ + return MemReaderConfigFactory.model_validate( + APIConfig.get_product_default_config()["mem_reader"] + ) + + +def build_reranker_config() -> dict[str, Any]: + """ + Build reranker configuration. + + Returns: + Validated reranker configuration dictionary + """ + return RerankerConfigFactory.model_validate(APIConfig.get_reranker_config()) + + +def build_internet_retriever_config() -> dict[str, Any]: + """ + Build internet retriever configuration. + + Returns: + Validated internet retriever configuration dictionary + """ + return InternetRetrieverConfigFactory.model_validate(APIConfig.get_internet_config()) + + +def build_pref_extractor_config() -> dict[str, Any]: + """ + Build preference memory extractor configuration. + + Returns: + Validated extractor configuration dictionary + """ + return ExtractorConfigFactory.model_validate({"backend": "naive", "config": {}}) + + +def build_pref_adder_config() -> dict[str, Any]: + """ + Build preference memory adder configuration. + + Returns: + Validated adder configuration dictionary + """ + return AdderConfigFactory.model_validate({"backend": "naive", "config": {}}) + + +def build_pref_retriever_config() -> dict[str, Any]: + """ + Build preference memory retriever configuration. + + Returns: + Validated retriever configuration dictionary + """ + return RetrieverConfigFactory.model_validate({"backend": "naive", "config": {}}) + + +def _get_default_memory_size(cube_config: Any) -> dict[str, int]: + """ + Get default memory size configuration. + + Attempts to retrieve memory size from cube config, falls back to defaults + if not found. + + Args: + cube_config: The cube configuration object + + Returns: + Dictionary with memory sizes for different memory types + """ + return getattr(cube_config.text_mem.config, "memory_size", None) or { + "WorkingMemory": 20, + "LongTermMemory": 1500, + "UserMemory": 480, + } + + +def _init_chat_llms(chat_llm_configs: list[dict]) -> dict[str, Any]: + """ + Initialize chat language models from configuration. + + Args: + chat_llm_configs: List of chat LLM configuration dictionaries + + Returns: + Dictionary mapping model names to initialized LLM instances + """ + + def _list_models(client): + try: + models = ( + [model.id for model in client.models.list().data] + if client.models.list().data + else client.models.list().models + ) + except Exception as e: + logger.error(f"Error listing models: {e}") + models = [] + return models + + model_name_instrance_maping = {} + for cfg in chat_llm_configs: + llm = LLMFactory.from_config(cfg["config_class"]) + if cfg["support_models"]: + for model_name in cfg["support_models"]: + model_name_instrance_maping[model_name] = llm + return model_name_instrance_maping + + +def init_components() -> dict[str, Any]: + # Initialize Redis client first as it is a core dependency for features like scheduler status tracking + try: + from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager + + redis_client = APIRedisDBManager.load_redis_engine_from_env() + if redis_client: + logger.info("Redis client initialized successfully.") + else: + logger.error( + "Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables." + ) + except Exception as e: + logger.error(f"Failed to initialize Redis client: {e}", exc_info=True) + redis_client = None # Ensure redis_client exists even on failure + + # Get default cube configuration + default_cube_config = APIConfig.get_default_cube_config() + + # Build component configurations + graph_db_config = build_graph_db_config() + llm_config = build_llm_config() + embedder_config = build_embedder_config() + mem_reader_config = build_mem_reader_config() + reranker_config = build_reranker_config() + internet_retriever_config = build_internet_retriever_config() + vector_db_config = build_vec_db_config() + pref_extractor_config = build_pref_extractor_config() + pref_adder_config = build_pref_adder_config() + pref_retriever_config = build_pref_retriever_config() + + logger.debug("Component configurations built successfully") + + # Create component instances + graph_db = GraphStoreFactory.from_config(graph_db_config) + vector_db = ( + VecDBFactory.from_config(vector_db_config) + if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" + else None + ) + llm = LLMFactory.from_config(llm_config) + embedder = EmbedderFactory.from_config(embedder_config) + mem_reader = MemReaderFactory.from_config(mem_reader_config) + reranker = RerankerFactory.from_config(reranker_config) + internet_retriever = InternetRetrieverFactory.from_config( + internet_retriever_config, embedder=embedder + ) + + # Initialize chat llms + logger.debug("Core components instantiated") + + # Initialize memory manager + memory_manager = MemoryManager( + graph_db, + embedder, + llm, + memory_size=_get_default_memory_size(default_cube_config), + is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), + ) + + logger.debug("Memory manager initialized") + + tokenizer = FastTokenizer() + # Initialize text memory + text_mem = SimpleTreeTextMemory( + llm=llm, + embedder=embedder, + mem_reader=mem_reader, + graph_db=graph_db, + reranker=reranker, + memory_manager=memory_manager, + config=default_cube_config.text_mem.config, + internet_retriever=internet_retriever, + tokenizer=tokenizer, + ) + + logger.debug("Text memory initialized") + + # Initialize preference memory components + pref_extractor = ( + ExtractorFactory.from_config( + config_factory=pref_extractor_config, + llm_provider=llm, + embedder=embedder, + vector_db=vector_db, + ) + if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" + else None + ) + + pref_adder = ( + AdderFactory.from_config( + config_factory=pref_adder_config, + llm_provider=llm, + embedder=embedder, + vector_db=vector_db, + text_mem=text_mem, + ) + if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" + else None + ) + + pref_retriever = ( + RetrieverFactory.from_config( + config_factory=pref_retriever_config, + llm_provider=llm, + embedder=embedder, + reranker=reranker, + vector_db=vector_db, + ) + if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" + else None + ) + + logger.debug("Preference memory components initialized") + + # Initialize preference memory + pref_mem = ( + SimplePreferenceTextMemory( + extractor_llm=llm, + vector_db=vector_db, + embedder=embedder, + reranker=reranker, + extractor=pref_extractor, + adder=pref_adder, + retriever=pref_retriever, + ) + if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" + else None + ) + + # Create MemCube with pre-initialized memory instances + naive_mem_cube = NaiveMemCube( + text_mem=text_mem, + pref_mem=pref_mem, + act_mem=None, + para_mem=None, + ) + # Return all components as a dictionary for easy access and extension + return { + "naive_mem_cube": naive_mem_cube, + } diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 601c935a2..2448490a6 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -153,6 +153,45 @@ def long_memory_update_process( mem_cube=self.current_mem_cube, ) + def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: + logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") + # Process the query in a session turn + grouped_messages = group_messages_by_user_and_mem_cube(messages=messages) + + self.validate_schedule_messages(messages=messages, label=ADD_LABEL) + try: + for user_id in grouped_messages: + for mem_cube_id in grouped_messages[user_id]: + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue + + # Process each message in the batch + for msg in batch: + prepared_add_items, prepared_update_items_with_original = ( + self.log_add_messages(msg=msg) + ) + logger.info( + f"prepared_add_items: {prepared_add_items};\n prepared_update_items_with_original: {prepared_update_items_with_original}" + ) + # Conditional Logging: Knowledge Base (Cloud Service) vs. Playground/Default + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") + == "memos-memory-change" + ) + + if is_cloud_env: + self.send_add_log_messages_to_cloud_env( + msg, prepared_add_items, prepared_update_items_with_original + ) + else: + self.send_add_log_messages_to_local_env( + msg, prepared_add_items, prepared_update_items_with_original + ) + + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: """ Process and handle query trigger messages from the queue. @@ -309,7 +348,9 @@ def log_add_messages(self, msg: ScheduleMessageItem): if missing_ids: content_preview = ( - msg.content[:200] + "..." if isinstance(msg.content, str) and len(msg.content) > 200 else msg.content + msg.content[:200] + "..." + if isinstance(msg.content, str) and len(msg.content) > 200 + else msg.content ) logger.warning( "Missing TextualMemoryItem(s) during add log preparation. " @@ -340,61 +381,6 @@ def log_add_messages(self, msg: ScheduleMessageItem): ) return prepared_add_items, prepared_update_items_with_original - def send_add_log_messages_to_cloud_env( - self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original - ): - # New: Knowledge Base Logging (Cloud Service) - kb_log_content = [] - for item in prepared_add_items: - kb_log_content.append( - { - "log_source": "KNOWLEDGE_BASE_LOG", - "trigger_source": msg.info.get("trigger_source", "Messages") - if msg.info - else "Messages", # Assuming msg.info is available and contains trigger_source - "operation": "ADD", - "memory_id": item.id, - "content": item.memory, - "original_content": None, - "source_doc_id": getattr(item.metadata, "source_doc_id", None), - } - ) - for item_data in prepared_update_items_with_original: - new_item = item_data["new_item"] - kb_log_content.append( - { - "log_source": "KNOWLEDGE_BASE_LOG", - "trigger_source": msg.info.get("trigger_source", "Messages") - if msg.info - else "Messages", - "operation": "UPDATE", - "memory_id": new_item.id, - "content": new_item.memory, - "original_content": item_data["original_content"], # Now correctly fetched - "source_doc_id": getattr(new_item.metadata, "source_doc_id", None), - } - ) - - if kb_log_content: - event = self.create_event_log( - label="knowledgeBaseUpdate", - # 1) Remove log_content parameter - # 2) Add memory_type - from_memory_type=USER_INPUT_TYPE, - to_memory_type=LONG_TERM_MEMORY_TYPE, - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - mem_cube=self.current_mem_cube, - memcube_log_content=kb_log_content, - metadata=None, - memory_len=len(kb_log_content), - memcube_name=self._map_memcube_name(msg.mem_cube_id), - ) - # 3) Assign log_content afterwards - event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes." - event.task_id = msg.task_id - self._submit_web_logs([event], additional_log_info="send_add_log_messages_to_cloud_env") - def send_add_log_messages_to_local_env( self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original ): @@ -585,7 +571,8 @@ def process_message(message: ScheduleMessageItem): mem_cube = self.current_mem_cube if mem_cube is None: logger.warning( - f"mem_cube is None for user_id={user_id}, mem_cube_id={mem_cube_id}, skipping processing" + f"mem_cube is None for user_id={user_id}, mem_cube_id={mem_cube_id}, skipping processing", + stack_info=True, ) return @@ -625,7 +612,7 @@ def process_message(message: ScheduleMessageItem): ) except Exception as e: - logger.error(f"Error processing mem_read message: {e}", exc_info=True) + logger.error(f"Error processing mem_read message: {e}", stack_info=True) with ContextThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: futures = [executor.submit(process_message, msg) for msg in messages] @@ -633,7 +620,7 @@ def process_message(message: ScheduleMessageItem): try: future.result() except Exception as e: - logger.error(f"Thread task failed: {e}", exc_info=True) + logger.error(f"Thread task failed: {e}", stack_info=True) def _process_memories_with_reader( self, diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index ae900abc7..954855f90 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -66,7 +66,7 @@ DEFAULT_MAX_WEB_LOG_QUEUE_SIZE = 50 # task queue -DEFAULT_STREAM_KEY_PREFIX = "scheduler:messages:stream:v1.3" +DEFAULT_STREAM_KEY_PREFIX = "scheduler:messages:stream:v1.4" exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME", None) if exchange_name is not None: DEFAULT_STREAM_KEY_PREFIX += f":{exchange_name}" diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index 22a044358..5c551b23e 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -195,6 +195,58 @@ def _ensure_consumer_group(self, stream_key) -> None: else: logger.error(f"Error creating consumer group: {e}", exc_info=True) + def _get_pending_lock_key(self, stream_key: str) -> str: + """Compose a Redis lock key for pending reads on a specific stream. + + Lock key includes stream prefix and consumer group to avoid collisions + across different deployments/groups. + """ + # Use a stable lock namespace; include group to isolate multiple schedulers + return f"{self.stream_key_prefix}:lock:pending:{self.consumer_group}:{stream_key}" + + def _acquire_pending_lock(self, stream_key: str, ttl_ms: int = 2000) -> str | None: + """Try to acquire a short-lived lock before reading pending messages. + + Returns a unique token if the lock is acquired, otherwise None. + """ + if not self._redis_conn: + return None + token = uuid4().hex + try: + ok = self._redis_conn.set( + self._get_pending_lock_key(stream_key), token, nx=True, px=ttl_ms + ) + if ok: + logger.debug( + f"Acquired pending-read lock for stream '{stream_key}' (ttl_ms={ttl_ms})" + ) + return token + else: + logger.debug(f"Skip pending-read: lock not acquired for stream '{stream_key}'") + return None + except Exception as e: + logger.warning(f"Failed to acquire pending-read lock for '{stream_key}': {e}") + return None + + def _release_pending_lock(self, stream_key: str, token: str) -> None: + """Release the pending-read lock only if owned (token matches).""" + if not self._redis_conn or not token: + return + lock_key = self._get_pending_lock_key(stream_key) + # Compare-and-delete via Lua to ensure we only release our own lock + lua = """ + if redis.call('get', KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) + else + return 0 + end + """ + try: + self._redis_conn.eval(lua, 1, lock_key, token) + logger.debug(f"Released pending-read lock for stream '{stream_key}'") + except Exception as e: + logger.debug(f"Release lock failed for '{stream_key}': {e}") + def put( self, message: ScheduleMessageItem, block: bool = True, timeout: float | None = None ) -> None: @@ -338,34 +390,46 @@ def get( need_pending_count = need_pending if need_pending > 0 else 0 if need_pending_count: - try: - pending_messages = self._redis_conn.xreadgroup( - self.consumer_group, - self.consumer_name, - {stream_key: "0"}, # read only this consumer's pending - count=need_pending_count, - block=None, # do not block when checking pending - ) - except Exception as read_err: - # Handle missing group/stream by creating and retrying once - err_msg = str(read_err).lower() - if "nogroup" in err_msg or "no such key" in err_msg: - logger.warning( - f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (pending)." - ) - self._ensure_consumer_group(stream_key=stream_key) + # Acquire a short-lived lock to avoid multiple processes reading the same pending + # messages concurrently when sharing the same consumer_name. + ttl_ms = 2000 + token = self._acquire_pending_lock(stream_key=stream_key, ttl_ms=ttl_ms) + if token: + try: try: pending_messages = self._redis_conn.xreadgroup( self.consumer_group, self.consumer_name, - {stream_key: "0"}, + {stream_key: "0"}, # read only this consumer's pending count=need_pending_count, - block=None, + block=None, # do not block when checking pending ) - except Exception: - pending_messages = [] - else: - pending_messages = [] + except Exception as read_err: + # Handle missing group/stream by creating and retrying once + err_msg = str(read_err).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + logger.warning( + f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (pending)." + ) + self._ensure_consumer_group(stream_key=stream_key) + try: + pending_messages = self._redis_conn.xreadgroup( + self.consumer_group, + self.consumer_name, + {stream_key: "0"}, + count=need_pending_count, + block=None, + ) + except Exception: + pending_messages = [] + else: + pending_messages = [] + finally: + # Always release the lock + self._release_pending_lock(stream_key=stream_key, token=token) + else: + # If lock not acquired, skip pending read in this round + pending_messages = [] # Combine: new first, then pending messages = []