Skip to content

Commit 18e22b5

Browse files
New tool: ingest_email.py (#111)
This does the same as `@add_messages` in test_gmail.py does, but now the storage provider interface has changed to allow storing the "is it ingested" flag per message id in the database, so it is transactionally safe. (Almost all by Claude Opus 4.5 Preview.) I'm not in a hurry to remove test_email.py -- we need to make sure that all its use cases are now incorporated into either ingest_email.py ro query.py. I think @parse_messages is the only one left. Also updated the docs about getting a fresh secret for gmail_dump.py -- I'd forgotten and it was a bit painful to recover the knowledge.
1 parent b320141 commit 18e22b5

File tree

9 files changed

+359
-21
lines changed

9 files changed

+359
-21
lines changed

docs/demos.md

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,21 @@ python tools/query.py -d mp.db
3030
```
3131
(You just type questions and it prints answers.)
3232

33-
## How we did the GMail demo
33+
## How we did the Gmail demo
3434

3535
The demo consisted of loading a large number (around 500) email messages
3636
into a database, and querying the database about those messages.
3737
The loading (ingestion) process was done ahead as it takes a long time.
3838

39-
We used the GMail API to download 550 messages from Guido's GMail
39+
We used the Gmail API to download 550 messages from Guido's Gmail
4040
(details below).
4141

4242
Given a folder with `*.eml` files in MIME format, we ran our email
4343
ingestion tool, `tools/test_email.py`. (All these details will change
4444
in the future, hopefully to be more similar to `ingest_vtt.py`.)
4545

46+
**TODO: Switch to describing ingest_email.py.**
47+
4648
The tool takes one positional argument, a directory, in which it will
4749
create a SQLite database named `gmail.db`.
4850
```sh
@@ -60,16 +62,32 @@ next file.
6062
We can then query the `gmail.db` database using the same `query.py`
6163
tool that we used for the Monty Python demo.
6264

63-
### How to use the GMail API to download messages
65+
### How to use the Gmail API to download messages
6466

6567
In the `gmail/` folder you'll find a tool named `gmail_dump.py` which
66-
will download any number of messages (default 50) using the GMail API.
67-
In order to use the GMail API, however, you have to create a
68+
will download any number of messages (default 50) using the Gmail API.
69+
In order to use the Gmail API, however, you have to create a (free)
6870
Google Cloud app and configure it appropriately.
6971

70-
In order to figure out how to set up the (free) Google Cloud app we
71-
used the instructions at [GeeksForGeeks
72-
](https://www.geeksforgeeks.org/devops/how-to-create-a-gcp-project/).
72+
We created created an app in test mode at
73+
[Google Cloud Console](https://console.cloud.google.com) and gave it
74+
access to the Gmail API (I forget how exactly we did this part).
75+
76+
To create the needed client secret, we navigated to Client (side bar)
77+
and clicked on "+ Create Client" (in the row of actions at the top),
78+
selected "Desktop app", gave it a name, hit Create, scrolled down in the
79+
resulting dialog box, and hit "Download JSON". This produced a JSON file
80+
which should be copied into _client_secret.json_ in the gmail folder.
81+
(The Cloud Console interface may look different for you.)
82+
83+
The first time you run the gmail_dump.py script, it will take you to
84+
a browser where you have to log in and agree to various warnings about
85+
using an app in test mode etc. The gmail_dump.py script then writes a
86+
file _token.json_ and you're good for a week or so. When token.json
87+
expires, unfortunately you get a crash and you have to manually delete
88+
it to trigger the login flow again.
89+
(Sometimes starting a browser may fail, e.g. under WSL. Take the URL
90+
that's printed and manually go there.)
7391

7492
The rest of the email ingestion pipeline doesn't care where you got
7593
your `*.eml` files from -- every email provider has its own quirks.

docs/gmail.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,7 @@ Until we have time to write this up, your best bet is to
66
ask your favorite search engine or LLM-based chat bot for help.
77

88
More TBD.
9+
In the meantime there are some hints in [Demos][def].
10+
11+
12+
[def]: demos.md#how-to-use-the-gmail-api-to-download-messages

tools/ingest_email.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Microsoft Corporation.
3+
# Licensed under the MIT License.
4+
5+
"""
6+
Email Ingestion Tool
7+
8+
This script ingests email (.eml) files into a SQLite database
9+
that can be queried using tools/query.py.
10+
11+
Usage:
12+
python tools/ingest_email.py -d email.db inbox_dump/
13+
python tools/ingest_email.py -d email.db message1.eml message2.eml
14+
python query.py --database email.db --query "What was discussed?"
15+
"""
16+
17+
import argparse
18+
import asyncio
19+
import os
20+
import sys
21+
import time
22+
from pathlib import Path
23+
24+
from typeagent.aitools import utils
25+
from typeagent.emails.email_import import import_email_from_file
26+
from typeagent.emails.email_memory import EmailMemory
27+
from typeagent.emails.email_message import EmailMessage
28+
from typeagent.knowpro.convsettings import ConversationSettings
29+
from typeagent.storage.utils import create_storage_provider
30+
31+
32+
def create_arg_parser() -> argparse.ArgumentParser:
33+
"""Create argument parser for the email ingestion tool."""
34+
parser = argparse.ArgumentParser(
35+
description="Ingest email (.eml) files into a database for querying",
36+
formatter_class=argparse.RawDescriptionHelpFormatter,
37+
)
38+
39+
parser.add_argument(
40+
"paths",
41+
nargs="+",
42+
help="Path to one or more .eml files or directories containing .eml files",
43+
)
44+
45+
parser.add_argument(
46+
"-d",
47+
"--database",
48+
required=True,
49+
help="Path to the SQLite database file to create/use",
50+
)
51+
52+
parser.add_argument(
53+
"-v", "--verbose", action="store_true", help="Show verbose/debug output"
54+
)
55+
56+
return parser
57+
58+
59+
def collect_email_files(paths: list[str], verbose: bool) -> list[Path]:
60+
"""Collect all .eml files from the given paths (files or directories)."""
61+
email_files: list[Path] = []
62+
63+
for path_str in paths:
64+
path = Path(path_str)
65+
if not path.exists():
66+
print(f"Error: Path '{path}' not found", file=sys.stderr)
67+
sys.exit(1)
68+
69+
if path.is_file():
70+
if path.suffix.lower() == ".eml":
71+
email_files.append(path)
72+
else:
73+
print(f"Error: Skipping non-.eml file: {path}", file=sys.stderr)
74+
sys.exit(1)
75+
elif path.is_dir():
76+
eml_files = sorted(path.glob("*.eml"))
77+
if verbose:
78+
print(f" Found {len(eml_files)} .eml files in {path}")
79+
email_files.extend(eml_files)
80+
else:
81+
print(f"Error: Not a file or directory: {path}", file=sys.stderr)
82+
sys.exit(1)
83+
84+
return email_files
85+
86+
87+
async def ingest_emails(
88+
paths: list[str],
89+
database: str,
90+
verbose: bool = False,
91+
) -> None:
92+
"""Ingest email files into a database."""
93+
94+
# Collect all .eml files
95+
if verbose:
96+
print("Collecting email files...")
97+
email_files = collect_email_files(paths, verbose)
98+
99+
if not email_files:
100+
print("Error: No .eml files found", file=sys.stderr)
101+
sys.exit(1)
102+
103+
if verbose:
104+
print(f"Found {len(email_files)} email files to ingest")
105+
106+
# Load environment for model API access
107+
if verbose:
108+
print("Loading environment...")
109+
utils.load_dotenv()
110+
111+
# Create conversation settings and storage provider
112+
if verbose:
113+
print("Setting up conversation settings...")
114+
115+
settings = ConversationSettings()
116+
settings.storage_provider = await create_storage_provider(
117+
settings.message_text_index_settings,
118+
settings.related_term_index_settings,
119+
database,
120+
EmailMessage,
121+
)
122+
123+
# Create EmailMemory
124+
email_memory = await EmailMemory.create(settings)
125+
126+
if verbose:
127+
print(f"Target database: {database}")
128+
129+
batch_size = settings.semantic_ref_index_settings.batch_size
130+
if verbose:
131+
print(f"Batch size: {batch_size}")
132+
133+
# Parse and import emails
134+
if verbose:
135+
print("\nParsing and importing emails...")
136+
137+
successful_count = 0
138+
failed_count = 0
139+
skipped_count = 0
140+
start_time = time.time()
141+
142+
semref_coll = await settings.storage_provider.get_semantic_ref_collection()
143+
storage_provider = settings.storage_provider
144+
145+
for i, email_file in enumerate(email_files):
146+
try:
147+
if verbose:
148+
print(f"\n[{i + 1}/{len(email_files)}] {email_file}")
149+
150+
email = import_email_from_file(str(email_file))
151+
email_id = email.metadata.id
152+
153+
# Check if this email was already ingested
154+
if email_id and storage_provider.is_source_ingested(email_id):
155+
skipped_count += 1
156+
if verbose:
157+
print(f" [Already ingested, skipping]")
158+
continue
159+
160+
if verbose:
161+
print(f" From: {email.metadata.sender}")
162+
if email.metadata.subject:
163+
print(f" Subject: {email.metadata.subject}")
164+
print(f" Date: {email.timestamp}")
165+
print(f" Body chunks: {len(email.text_chunks)}")
166+
for chunk in email.text_chunks:
167+
# Show first 200 chars of each chunk
168+
preview = chunk[:200].replace("\n", " ")
169+
if len(chunk) > 200:
170+
preview += "..."
171+
print(f" {preview}")
172+
173+
# Pass source_id to mark as ingested atomically with the message
174+
source_ids = [email_id] if email_id else None
175+
await email_memory.add_messages_with_indexing(
176+
[email], source_ids=source_ids
177+
)
178+
successful_count += 1
179+
180+
# Print progress periodically
181+
if not verbose and (i + 1) % batch_size == 0:
182+
elapsed = time.time() - start_time
183+
semref_count = await semref_coll.size()
184+
print(
185+
f" [{i + 1}/{len(email_files)}] {successful_count} imported | "
186+
f"{semref_count} refs | {elapsed:.1f}s elapsed"
187+
)
188+
189+
except Exception as e:
190+
failed_count += 1
191+
print(f"Error processing {email_file}: {e}", file=sys.stderr)
192+
if verbose:
193+
import traceback
194+
195+
traceback.print_exc()
196+
197+
# Final summary
198+
elapsed = time.time() - start_time
199+
semref_count = await semref_coll.size()
200+
201+
print()
202+
if verbose:
203+
print(f"Successfully imported {successful_count} email(s)")
204+
if skipped_count:
205+
print(f"Skipped {skipped_count} already-ingested email(s)")
206+
if failed_count:
207+
print(f"Failed to import {failed_count} email(s)")
208+
print(f"Extracted {semref_count} semantic references")
209+
print(f"Total time: {elapsed:.1f}s")
210+
else:
211+
print(
212+
f"Imported {successful_count} emails to {database} "
213+
f"({semref_count} refs, {elapsed:.1f}s)"
214+
)
215+
if skipped_count:
216+
print(f"Skipped: {skipped_count} (already ingested)")
217+
if failed_count:
218+
print(f"Failed: {failed_count}")
219+
220+
# Show usage information
221+
print()
222+
print("To query the emails, use:")
223+
print(
224+
f" python tools/query.py --database '{database}' --query 'Your question here'"
225+
)
226+
227+
228+
def main() -> None:
229+
"""Main entry point."""
230+
parser = create_arg_parser()
231+
args = parser.parse_args()
232+
233+
asyncio.run(
234+
ingest_emails(
235+
paths=args.paths,
236+
database=args.database,
237+
verbose=args.verbose,
238+
)
239+
)
240+
241+
242+
if __name__ == "__main__":
243+
main()

typeagent/emails/email_import.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
6262
recipients=_import_address_headers(msg.get_all("To", [])),
6363
cc=_import_address_headers(msg.get_all("Cc", [])),
6464
bcc=_import_address_headers(msg.get_all("Bcc", [])),
65-
subject=msg.get("Subject"),
65+
subject=msg.get("Subject"), # TODO: Remove newlines
6666
id=msg.get("Message-ID", None),
6767
)
6868
timestamp: str | None = None

typeagent/knowpro/conversation_base.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ async def add_metadata_to_index(self) -> None:
117117
async def add_messages_with_indexing(
118118
self,
119119
messages: list[TMessage],
120+
*,
121+
source_ids: list[str] | None = None,
120122
) -> AddMessagesResult:
121123
"""
122124
Add messages and build all indexes incrementally in a single transaction.
@@ -128,16 +130,25 @@ async def add_messages_with_indexing(
128130
129131
Args:
130132
messages: Messages to add
133+
source_ids: Optional list of source IDs to mark as ingested. These are
134+
marked within the same transaction, so if the indexing fails, the
135+
source IDs won't be marked as ingested (for SQLite storage).
131136
132137
Returns:
133138
Result with counts of messages/semrefs added
134139
135140
Raises:
136-
BaseException: Any error
141+
Exception: Any error
137142
"""
138143
storage = await self.settings.get_storage_provider()
139144

140145
async with storage:
146+
# Mark source IDs as ingested before adding messages
147+
# This way, if indexing fails, the rollback will also undo the marks
148+
if source_ids:
149+
for source_id in source_ids:
150+
storage.mark_source_ingested(source_id)
151+
141152
start_points = IndexingStartPoints(
142153
message_count=await self.messages.size(),
143154
semref_count=await self.semantic_refs.size(),

typeagent/knowpro/interfaces.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -863,12 +863,7 @@ async def get_conversation_threads(self) -> IConversationThreads: ...
863863

864864
# Metadata management
865865
def get_conversation_metadata(self) -> ConversationMetadata:
866-
"""Get conversation metadata.
867-
868-
Always returns a ConversationMetadata instance. Fields not found in
869-
the database will be None. If no metadata exists at all, returns
870-
an instance with all fields None.
871-
"""
866+
"""Get conversation metadata (missing fields set to None)."""
872867
...
873868

874869
def set_conversation_metadata(self, **kwds: str | list[str] | None) -> None:
@@ -887,12 +882,16 @@ def update_conversation_timestamps(
887882
created_at: Datetime | None = None,
888883
updated_at: Datetime | None = None,
889884
) -> None:
890-
"""Update conversation timestamps.
885+
"""Update conversation timestamps."""
886+
...
891887

892-
Args:
893-
created_at: Optional creation timestamp
894-
updated_at: Optional last updated timestamp
895-
"""
888+
# Ingested source tracking
889+
def is_source_ingested(self, source_id: str) -> bool:
890+
"""Check if a source has already been ingested."""
891+
...
892+
893+
def mark_source_ingested(self, source_id: str) -> None:
894+
"""Mark a source as ingested (no commit; call within transaction context)."""
896895
...
897896

898897
# Transaction management

0 commit comments

Comments
 (0)