Skip to content

Commit 4ec964f

Browse files
authored
Merge pull request #9 from nasa/GESDISCUMU-5133-Refactor-and-Clean-Up-Logging-Across-Codebase
refactored all logging across codebase
2 parents 7b38a8a + 19818d2 commit 4ec964f

File tree

8 files changed

+186
-135
lines changed

8 files changed

+186
-135
lines changed

src/gapConfig/gapConfig.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ def get_cmr_time(collection_id: str) -> Tuple[str, str]:
8484
url = f"https://cmr.earthdata.nasa.gov/search/collections.umm_json_v1_4?short_name={short_name}&version={version}"
8585
else:
8686
url = f"https://cmr.{cmr_env}.earthdata.nasa.gov/search/collections.umm_json_v1_4?short_name={short_name}&version={version}"
87-
logger.info(f"Requesting granule time from: {url}")
87+
88+
logger.debug(f"Requesting granule time from: {url}")
8889
res = requests.get(url)
8990
data = res.json()
9091
if not data["items"]:
@@ -116,7 +117,7 @@ def init_collection(collection_name, collection_version, conn) -> str:
116117
collection_id = f"{collection_name}___{collection_version}"
117118
try:
118119
start, end = get_cmr_time(collection_id)
119-
logger.info(f"Initializing {collection_id} with {start, end}")
120+
logger.debug(f"Retrieved temporal extent for {collection_id}: {start} to {end}")
120121

121122
# For new collection, partition `gaps` and `reasons` tables and insert into `collections` table
122123
with conn.cursor() as cur:
@@ -148,6 +149,7 @@ def init_collection(collection_name, collection_version, conn) -> str:
148149
logger.info(
149150
f"Created gaps partition {partition_name} for collection {collection_id}"
150151
)
152+
151153
# Create partition on `reasons` table
152154
reasons_partition_name = f"reasons_{safe_collection_id}"
153155
cur.execute(
@@ -189,11 +191,13 @@ def init_collection(collection_name, collection_version, conn) -> str:
189191
)
190192

191193
conn.commit()
194+
logger.info(f"Successfully initialized collection {collection_id}")
192195
return f"Initialized collection {collection_id} in table"
193196

194197
except Exception as e:
195198
conn.rollback()
196-
logger.warning(traceback.format_exc())
199+
logger.error(f"Collection {collection_id} initialization failed: {str(e)}")
200+
logger.debug(traceback.format_exc())
197201
return f"Collection {collection_id} initialization failed: {str(e)}"
198202

199203

@@ -236,8 +240,10 @@ def init_migration_stream(collection_name, collection_version):
236240
)
237241
payload_response = json.loads(response["Payload"].read().decode())
238242
if response["StatusCode"] != 200 or payload_response.get("statusCode") != 200:
243+
logger.error(f"Migration stream invocation failed for {collection_name} v{collection_version}")
239244
raise Exception(f"Collection backfill failed: {payload_response.get('body')}")
240245

246+
logger.info(f"Migration stream completed for {collection_name} v{collection_version}")
241247
return {
242248
"status": "success",
243249
"statusCode": response["StatusCode"],
@@ -260,11 +266,11 @@ def save_tolerance_to_dynamodb(shortname: str, versionid: str, tolerance: int):
260266
"granulegap": tolerance,
261267
}
262268
)
263-
logger.info(
269+
logger.debug(
264270
f"Saved tolerance for {shortname}___{versionid}: {tolerance} seconds. PutItem Response: {response['ResponseMetadata']['HTTPStatusCode']}"
265271
)
266272
except Exception as e:
267-
logger.error(f"Failed to save tolerance to DynamoDB: {str(e)}")
273+
logger.error(f"Failed to save tolerance to DynamoDB for {shortname}___{versionid}: {str(e)}")
268274
raise
269275

270276

@@ -291,13 +297,13 @@ def lambda_handler(event: events.SQSEvent, context: Context) -> Dict[str, Any]:
291297
try:
292298
http_method = event.get("httpMethod", "")
293299
resource_path = event.get("path", "")
294-
logger.info(f"Got HTTP {http_method} for {resource_path}")
300+
logger.debug(f"Got HTTP {http_method} for {resource_path}")
295301

296302
try:
297303
collections, backfill_behavior = parse_event(event)
298304
except Exception as e:
299-
message = f"Error processing request: {str(e)}"
300-
logger.error(traceback.format_exc())
305+
message = f"Invalid request format: {str(e)}"
306+
logger.warning(message)
301307
return build_response(400, {"message": message})
302308

303309
if http_method != "POST":
@@ -308,6 +314,7 @@ def lambda_handler(event: events.SQSEvent, context: Context) -> Dict[str, Any]:
308314
for collection in collections:
309315
collection_id = f"{collection['name']}___{collection['version']}"
310316
tolerance = collection.get("tolerance")
317+
311318
# Update tolerance table even if the collection already exists
312319
if tolerance is not None:
313320
try:
@@ -316,45 +323,42 @@ def lambda_handler(event: events.SQSEvent, context: Context) -> Dict[str, Any]:
316323
collection["raw_version"],
317324
int(tolerance),
318325
)
326+
logger.info(f"Updated tolerance for {collection['name']} v{collection['raw_version']}: {tolerance}s")
319327
except Exception as e:
320328
logger.error(
321329
f"Error saving tolerance for {collection['name']}___{collection['raw_version']}: {str(e)}"
322330
)
331+
323332
# Add collection to collections table, create partition for gaps table, set initial full gap
324333
if collection_id not in current_collections:
325-
message = init_collection(
326-
collection["name"], collection["version"], conn
327-
)
328-
logger.info(message)
334+
init_collection(collection["name"], collection["version"], conn)
329335

330336
# Kick off the migration stream
331337
try:
332-
logger.info(f"Starting collection backfill")
333338
migration_result = init_migration_stream(
334339
collection["name"], collection["version"].replace("_", ".")
335340
)
336-
logger.info(f"Backfill result: {migration_result}")
341+
logger.debug(f"Backfill result: {migration_result}")
337342
except Exception as e:
338343
message = (
339344
f"Collection backfill failed for {collection_id}: {str(e)}"
340345
)
341346
logger.error(message)
342-
logger.warn(
347+
logger.warning(
343348
f"Collection {collection_id} left in incomplete state, use force=True to rectify"
344349
)
345350
return build_response(500, {"message": message})
351+
346352
# Skip DB init but still backfill granules from CMR
347353
elif backfill_behavior.lower() == "force":
348354
logger.info(
349355
f"Force flag detected, proceeding with backfill for existing collection: {collection_id}"
350356
)
351-
# Kick off the migration stream
352357
try:
353-
logger.info(f"Starting collection backfill")
354358
migration_result = init_migration_stream(
355359
collection["name"], collection["version"].replace("_", ".")
356360
)
357-
logger.info(f"Backfill result: {migration_result}")
361+
logger.debug(f"Backfill result: {migration_result}")
358362
except Exception as e:
359363
message = (
360364
f"Collection backfill failed for {collection_id}: {str(e)}"
@@ -366,11 +370,12 @@ def lambda_handler(event: events.SQSEvent, context: Context) -> Dict[str, Any]:
366370
f"Skipping initialization of {collection_id}: already exists in collection table"
367371
)
368372

373+
logger.info(f"Collection initialization completed for {len(collections)} collection(s)")
369374
return build_response(
370375
200, {"message": f"Collection initialization complete for {collections}"}
371376
)
372377

373378
except Exception as e:
374-
logger.error(f"Error processing request: {str(e)}")
375-
logger.error(traceback.format_exc())
379+
logger.error(f"Unexpected error in lambda handler: {str(e)}")
380+
logger.debug(traceback.format_exc())
376381
return build_response(500, {"message": "Unexpected error occurred"})

src/gapMigrationStreamMessageCompiler/gapMigrationStreamMessageCompiler.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
2020
handler.setFormatter(formatter)
2121
logger.addHandler(handler)
22-
logger.info("Logger initialized successfully.")
2322

2423
loop = asyncio.get_event_loop()
2524

@@ -106,11 +105,12 @@ def get_params(short_name, version, max_producers=8, consumer_ratio=1.5):
106105
queue_size = n_producers * 2 * 2000
107106
date_ranges = split_date_ranges(beginning_date, ending_date, n_producers)
108107

108+
logger.info(f"Collection {short_name} v{version}: {num_granules} granules, {n_producers} producers, {n_consumers} consumers")
109109
return date_ranges, n_consumers, queue_size, num_granules
110110

111111
except Exception as e:
112112
logger.error(f"Error occurred: {e}")
113-
logger.error(traceback.format_exc())
113+
logger.debug(traceback.format_exc())
114114
return None, {
115115
"statusCode": 400,
116116
"body": json.dumps(
@@ -141,6 +141,7 @@ async def fetch_cmr_range(session, url, params, result_queue, fetch_stats):
141141
"""
142142
search_after = None
143143
max_retries = 3
144+
144145
while True:
145146
headers = {"CMR-Search-After": search_after} if search_after else {}
146147

@@ -152,7 +153,7 @@ async def fetch_cmr_range(session, url, params, result_queue, fetch_stats):
152153
if response.status != 200:
153154
error_body = await response.text()
154155
if retry < max_retries:
155-
logger.warning(
156+
logger.debug(
156157
f"CMR API error: HTTP {response.status} on {params}: {error_body} "
157158
f"Retrying in {retry ** 2}s ({retry+1}/{max_retries})"
158159
)
@@ -178,14 +179,12 @@ async def fetch_cmr_range(session, url, params, result_queue, fetch_stats):
178179
fetch_stats["total"] += len(granules)
179180
if not search_after:
180181
return
181-
if fetch_stats["total"] % 10000 == 0:
182-
logger.info(f"{fetch_stats["total"]} granules fetched")
183182
break
184183

185184
except Exception as e:
186185
if retry < max_retries:
187186
retry_delay = retry**2
188-
logger.warning(
187+
logger.debug(
189188
f"Error fetching CMR page for {params}: {str(e)}. "
190189
f"Retrying in {retry ** 2}s ({retry+1}/{max_retries})"
191190
)
@@ -279,7 +278,7 @@ async def process_collection(
279278

280279
fetch_stats = {"total": 0}
281280
send_stats = {"total": 0}
282-
logger.info(f"Processing {short_name} v{version} with {len(partitions)} producers")
281+
logger.info(f"Starting collection processing: {short_name} v{version} ({len(partitions)} producers, {n_consumers} consumers)")
283282

284283
async with aiohttp.ClientSession() as http_session:
285284
async with aioboto3.Session().client(
@@ -322,16 +321,15 @@ async def process_collection(
322321

323322
# Wait for producers to finish
324323
await asyncio.gather(*producers)
325-
logger.info("All producers completed")
324+
logger.debug("All producers completed")
326325

327326
# Signal consumers to complete
328327
for _ in range(n_consumers):
329328
await result_queue.put(None)
330329

331330
# Wait for consumers
332331
await asyncio.gather(*consumers)
333-
logger.info("All tasks complete")
334-
332+
logger.debug("All consumers completed")
335333

336334
except Exception as e:
337335
logger.error(f"Failed to process collection {short_name} v{version}: {e}")
@@ -342,7 +340,8 @@ async def process_collection(
342340
send_stats["total"] / total_duration if total_duration > 0 else 0
343341
)
344342
logger.info(
345-
f"{fetch_stats['total']} fetched, {send_stats['total']} sent in {total_duration:.1f}s ({throughput:.1f}/s)"
343+
f"Collection processing complete: {short_name} v{version} - "
344+
f"{fetch_stats['total']} fetched, {send_stats['total']} sent in {total_duration:.1f}s ({throughput:.1f} msg/s)"
346345
)
347346

348347

@@ -365,7 +364,7 @@ def lambda_handler(event, context):
365364
short_name = sns_message.get("short_name")
366365
version = sns_message.get("version")
367366
if not short_name or not version:
368-
logger.error("Missing short_name or version in the event")
367+
logger.warning("Missing short_name or version in the event")
369368
return {
370369
"statusCode": 400,
371370
"body": json.dumps(
@@ -375,7 +374,7 @@ def lambda_handler(event, context):
375374

376375
except Exception as e:
377376
logger.error(f"Input Error: {e}")
378-
logger.error(traceback.format_exc())
377+
logger.debug(traceback.format_exc())
379378
return None, {
380379
"statusCode": 400,
381380
"body": json.dumps(
@@ -407,11 +406,12 @@ def lambda_handler(event, context):
407406
total_granules,
408407
)
409408
)
409+
logger.info(f"Lambda execution completed successfully for {short_name} v{version}")
410410
return {
411411
"statusCode": 200,
412412
"body": json.dumps({"message": "Processing complete"}),
413413
}
414414
except Exception as e:
415-
logger.error(f"Error occurred: {e}")
416-
logger.error(traceback.format_exc())
417-
return {"statusCode": 500, "body": json.dumps({"error": str(e)})}
415+
logger.error(f"Lambda execution failed for {short_name} v{version}: {str(e)}")
416+
logger.debug(traceback.format_exc())
417+
return {"statusCode": 500, "body": json.dumps({"error": str(e)})}

src/gapReporter/gapReporter.py

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,43 +28,45 @@ def parse_collection_id(collection_id):
2828
shortname, versionid = collection_id.rsplit('___', 1)
2929
return shortname, versionid.replace('_', '.') # Reverse sanitize_versionid
3030

31-
3231
def lambda_handler(event, context):
3332
"""
3433
AWS Lambda handler that processes all collections from the DB.
3534
For each collection:
36-
- gets granule gap from DynamoDB
37-
- fetches time gaps exceeding granule gap
38-
- creates and uploads a CSV to S3 if gaps exist
39-
35+
- gets granule gap from DynamoDB
36+
- fetches time gaps exceeding granule gap
37+
- creates and uploads a CSV to S3 if gaps exist
4038
Returns summary of uploads.
4139
"""
4240
validate_environment_variables(['GAP_REPORT_BUCKET'])
43-
4441
results = []
42+
4543
with get_db_connection() as conn:
4644
try:
4745
collections = check_collections(conn)
48-
logger.info(f"Found {len(collections)} collections in collections table.")
46+
logger.info(f"Processing gap reports for {len(collections)} collections")
4947
except Exception as e:
5048
logger.error(f"Failed to fetch collections: {e}")
5149
return {'statusCode': 500, 'body': 'Failed to fetch collections'}
5250

51+
upload_count = 0
52+
skip_count = 0
53+
error_count = 0
54+
5355
for collection_id in collections:
5456
try:
5557
shortname, versionid = parse_collection_id(collection_id)
56-
logger.info(f"Processing collection: {shortname} version {versionid}")
57-
58+
logger.debug(f"Processing collection: {shortname} version {versionid}")
59+
5860
granule_gap = get_granule_gap(shortname, versionid)
59-
logger.info(f"Granule gap: {granule_gap} seconds")
60-
61+
logger.debug(f"Granule gap threshold for {shortname} version {versionid}: {granule_gap}s")
62+
6163
with conn.cursor() as cursor:
6264
time_gaps = fetch_time_gaps(shortname, versionid, granule_gap, cursor)
63-
logger.info(f"Found {len(time_gaps)} time gaps exceeding threshold.")
64-
65+
6566
if not time_gaps:
66-
logger.info(f"No qualifying time gaps for {collection_id}. Skipping upload.")
67+
logger.debug(f"No qualifying time gaps for {collection_id}. Skipping upload.")
6768
results.append({'collection_id': collection_id, 'status': 'no gaps'})
69+
skip_count += 1
6870
continue
6971

7072
# Create CSV
@@ -73,24 +75,38 @@ def lambda_handler(event, context):
7375
csvwriter = csv.writer(csvfile)
7476
csvwriter.writerow(['gap_begin', 'gap_end'])
7577
csvwriter.writerows(time_gaps)
76-
logger.info(f"Created CSV file {output_csv}")
7778

7879
# Upload to S3
7980
s3 = boto3.client('s3')
8081
bucket_name = os.environ['GAP_REPORT_BUCKET']
8182
s3_output_key = os.path.basename(output_csv)
82-
83-
s3.upload_file(output_csv, bucket_name, s3_output_key)
84-
logger.info(f"Uploaded CSV to s3://{bucket_name}/{s3_output_key}")
85-
86-
os.remove(output_csv)
87-
logger.info(f"Deleted temporary CSV file: {output_csv}")
88-
89-
results.append({'collection_id': collection_id, 'status': 'uploaded', 's3_key': s3_output_key})
90-
83+
84+
try:
85+
s3.upload_file(output_csv, bucket_name, s3_output_key)
86+
logger.info(f"Gap report uploaded: {collection_id} ({len(time_gaps)} gaps) -> s3://{bucket_name}/{s3_output_key}")
87+
upload_count += 1
88+
results.append({'collection_id': collection_id, 'status': 'uploaded', 's3_key': s3_output_key})
89+
except ClientError as e:
90+
logger.error(f"S3 upload failed for {collection_id}: {str(e)}")
91+
results.append({'collection_id': collection_id, 'status': 'upload_failed', 'error': str(e)})
92+
error_count += 1
93+
finally:
94+
# Clean up temp file
95+
if os.path.exists(output_csv):
96+
os.remove(output_csv)
97+
logger.debug(f"Cleaned up temp file: {output_csv}")
98+
99+
except ValueError as e:
100+
logger.warning(f"Invalid collection ID format: {collection_id}")
101+
results.append({'collection_id': collection_id, 'status': 'invalid_format', 'error': str(e)})
102+
error_count += 1
91103
except Exception as e:
92-
logger.error(f"Error processing collection {collection_id}: {e}")
104+
logger.error(f"Failed to process collection {collection_id}: {str(e)}")
93105
results.append({'collection_id': collection_id, 'status': 'error', 'error': str(e)})
106+
error_count += 1
107+
108+
# Summary logging
109+
logger.info(f"Gap report generation complete: {upload_count} uploaded, {skip_count} skipped, {error_count} errors")
94110

95111
return {
96112
'statusCode': 200,

0 commit comments

Comments
 (0)