Skip to content

Commit 1b9579c

Browse files
authored
Feat/complex search (#4)
* feat: support topic search * chore: update dependency * feat: update dependency version * doc
1 parent c1ae697 commit 1b9579c

File tree

6 files changed

+349
-13
lines changed

6 files changed

+349
-13
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
基于 Python `FastMCP``FastAPI` 的 自定义Elasticsearch MCP 服务Demo。
44

55
## 特性
6-
- 支持关键词搜索、二次过滤、按 ID 查询
6+
- 支持关键词搜索、二次过滤、按 ID 查询、复杂筛选词查询逻辑
77
- 基于 FastMCP 提供 MCP 协议工具:`search_news``search_news_with_secondary_filter``read_single_news`
88
- Prometheus 监控集成(`starlette_prometheus`
99
- 基于 Redis 的服务端 Session 存储(`RedisSessionMiddleware`

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.12"
77
dependencies = [
88
"elasticsearch==8.15",
9-
"fastmcp==2.8.1",
9+
"fastmcp==2.10",
1010
"uvicorn>=0.20.0",
1111
"python-dotenv>=0.20.0",
1212
"pydantic>=1.10.0",
@@ -18,6 +18,7 @@ dependencies = [
1818
"itsdangerous>=2.2.0",
1919
"gunicorn>=23.0.0",
2020
"redis>=6.2.0",
21+
"tenacity>=9.1.2",
2122
]
2223

2324
[[tool.uv.index]]

src/news_mcp_server/clients/elastic_client.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
4+
from elastic_transport import TransportError
5+
from typing import List
16
from elasticsearch import AsyncElasticsearch
27
from ..config.settings import es_settings
38
from ..exceptions import ToolException
@@ -8,12 +13,26 @@
813

914

1015
class AsyncElasticClient:
16+
@dataclass
17+
class SearchResponse:
18+
data: List[dict]
19+
total: int = 0
20+
1121
def __init__(self):
1222
# 初始化异步 ElasticSearch 客户端
1323
self._client = AsyncElasticsearch(es_settings.URL,
1424
api_key=es_settings.api_key, verify_certs=False)
1525
self.index = es_settings.ES_INDEX
1626

27+
@retry(
28+
reraise=True,
29+
stop=stop_after_attempt(3),
30+
wait=wait_exponential(multiplier=1, min=1, max=10),
31+
retry=(
32+
retry_if_exception_type(TransportError) |
33+
retry_if_exception_type(asyncio.TimeoutError)
34+
),
35+
)
1736
async def search_news(self, query: str, source: str = None, date_from: str = None, date_to: str = None, max_results: int = 10) -> list:
1837
"""
1938
ElasticSearch 异步搜索新闻
@@ -112,5 +131,82 @@ async def get_by_id(self, news_id: str) -> dict:
112131
except Exception:
113132
raise ToolException(f'Tool call exception with news_id {news_id}')
114133

134+
def _append_common_filters(self, must: list, search_word: str, date_from: str, date_to: str):
135+
"""提炼公共过滤器: 添加 search_word 和时间范围到 must 列表"""
136+
if search_word:
137+
must.append({
138+
'multi_match': {
139+
'query': search_word,
140+
'fields': ['title^5', 'content'],
141+
'operator': 'and'
142+
}
143+
})
144+
if date_from or date_to:
145+
range_filter = {}
146+
if date_from:
147+
range_filter['gte'] = date_from
148+
if date_to:
149+
range_filter['lte'] = date_to
150+
must.append({'range': {'release_time': range_filter}})
151+
152+
def _add_clauses(self, should_clauses: list, base_filters: list, secondary_queries: list[str], search_word: str, date_from: str, date_to: str):
153+
"""根据 base_filters 和 secondary_queries 构建子句并添加到 should_clauses"""
154+
if secondary_queries:
155+
for sec in secondary_queries:
156+
must = base_filters + [{'match_phrase': {'title': sec}}]
157+
self._append_common_filters(must, search_word, date_from, date_to)
158+
should_clauses.append({'bool': {'must': must}})
159+
else:
160+
must = base_filters.copy()
161+
self._append_common_filters(must, search_word, date_from, date_to)
162+
should_clauses.append({'bool': {'must': must}})
163+
164+
@retry(
165+
reraise=True,
166+
stop=stop_after_attempt(3),
167+
wait=wait_exponential(multiplier=1, min=1, max=10),
168+
retry=(
169+
retry_if_exception_type(TransportError) |
170+
retry_if_exception_type(asyncio.TimeoutError)
171+
),
172+
)
173+
async def search_topic_news(
174+
self,
175+
primary_queries: List[str],
176+
secondary_query: List[str]=None,
177+
max_results: int = 10,
178+
sources: List[str] = None,
179+
search_word=None,
180+
date_from: str = None,
181+
date_to: str = None
182+
) -> SearchResponse:
183+
"""
184+
"根据多个标签列表、筛选词列表(组)、数据源列表以 OR 关系批量查询新闻,支持时间范围筛选. "
185+
"基本查询逻辑:<label1>&<filtered_words>|<label2>&<filtered_words>|<source1>&<filtered_words>|...|"
186+
"允许在基本查询逻辑之上再搜索"
187+
"""
188+
limit = min(max_results, es_settings.MAX_RESULTS_LIMIT)
189+
secondary_queries = secondary_query or []
190+
should_clauses = []
191+
for primary in primary_queries or []:
192+
self._add_clauses(should_clauses, [{'match_phrase': {'title': primary}}], secondary_queries, search_word, date_from, date_to)
193+
for source in sources or []:
194+
self._add_clauses(should_clauses, [{'term': {'source.keyword': source}}], secondary_queries, search_word, date_from, date_to)
195+
body = {'query': {'bool': {'should': should_clauses}}}
196+
# 按发布日期降序排序
197+
body['sort'] = [{'release_time': {'order': 'desc'}}]
198+
199+
response = await self._client.search(
200+
index=self.index,
201+
body=body,
202+
size=limit,
203+
source_includes=OUTPUT_SOURCE_FIELDS
204+
)
205+
raw_hits = response.get('hits', {})
206+
hits = raw_hits.get('hits', [])
207+
total = raw_hits.get("total", {}).get("value", 0)
208+
return self.SearchResponse(data=[hit.get('_source', {}) for hit in hits], total=total)
209+
210+
115211
async def close(self):
116212
await self._client.close()

src/news_mcp_server/mcp_server.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,59 @@ async def read_single_news( ctx: Context,
111111
return news_item.model_dump()
112112

113113

114+
@mcp.tool(
115+
name="search_topic_news",
116+
description="根据多个主关键词列表、筛选词列表(组)、数据源列表以 OR 关系批量查询新闻,支持时间范围筛选. "
117+
"基本查询逻辑:<label1>&<filtered_words>|<label2>&<filtered_words>|<source1>&<filtered_words>|...|"
118+
"允许在基本查询逻辑之上再搜索"
119+
)
120+
async def search_topic_news(
121+
ctx: Context,
122+
primary_queries: List[str]= Field(
123+
description="【必填】根据多个主关键词列表,系统返回包含主关键词与次关键词组合,所有组合的结果以OR关系连接的新闻"
124+
),
125+
secondary_querys: List[str] = Field(default_factory=list,
126+
description="筛选词,将与每个主关键词进行 AND 运算"
127+
128+
),
129+
sources: List[str] = Field(default_factory=list,
130+
description="数据源列表,将与每个主关键词进行 AND 运算"
131+
),
132+
search_word: str = Field(default="", description="搜索词"),
133+
max_results: int = Field(
134+
default=15,
135+
description="【可选】希望返回的新闻数量,取值1-100,默认10"
136+
),
137+
date_from: str = Field(
138+
default="",
139+
description="【可选】起始发布日期,格式 YYYY-MM-DD"
140+
),
141+
date_to: str = Field(
142+
default="",
143+
description="【可选】结束发布日期,格式 YYYY-MM-DD"
144+
)
145+
) -> List[dict]:
146+
"""MCP 工具:按多个主关键词与次关键词组合(A&D|B&D|...)批量搜索新闻"""
147+
logger.info(f"Call search_topic_news", primary_queries=primary_queries,secondary_query=secondary_querys, ctx=ctx.request_context.request['state'])
148+
if isinstance(primary_queries, str) and len(primary_queries.strip())>0:
149+
primary_queries = [primary_queries]
150+
if len(primary_queries) == 0:
151+
return []
152+
if isinstance(secondary_querys, str) and len(secondary_querys.strip())>0:
153+
secondary_querys = [secondary_querys]
154+
if isinstance(sources, str) and len(sources.strip())>0:
155+
sources = [sources]
156+
news_items = await app_services["news_service"].search_topic_news(
157+
primary_queries=primary_queries,
158+
secondary_query=secondary_querys,
159+
max_results=max_results,
160+
sources=sources,
161+
search_word=search_word,
162+
date_from=date_from,
163+
date_to=date_to
164+
)
165+
logger.info(f"Call search_topic_news", total=news_items.get("total"), primary_queries_count=len(primary_queries),secondary_query_count=len(secondary_querys), ctx=ctx.request_context.request['state'])
166+
return [item.model_dump() for item in news_items.get("data")]
167+
168+
114169
mcp_app = create_http_app(mcp)

src/news_mcp_server/services/news_service.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,31 @@ async def search_news_with_secondary_filter(self,
5050
date_from=date_from,
5151
date_to=date_to,
5252
)
53-
return [NewsBaseItem(**item) for item in items]
53+
return [NewsBaseItem(**item) for item in items]
54+
55+
async def search_topic_news(
56+
self,
57+
primary_queries: List[str],
58+
secondary_query: List[str],
59+
max_results: int = 10,
60+
sources: Optional[str] = None,
61+
search_word=None,
62+
date_from: Optional[str] = None,
63+
date_to: Optional[str] = None
64+
) -> dict:
65+
"""
66+
新功能:按多个主关键词(组)与次关键词组合(A&D|B&D|...)搜索新闻,并返回 NewsBaseItem 列表
67+
"""
68+
items = await self.client.search_topic_news(
69+
primary_queries=primary_queries,
70+
secondary_query=secondary_query,
71+
sources=sources,
72+
max_results=max_results,
73+
search_word=search_word,
74+
date_from=date_from,
75+
date_to=date_to
76+
)
77+
return {
78+
"total": items.total,
79+
"data": [NewsBaseItem(**item) for item in items.data]
80+
}

0 commit comments

Comments
 (0)