Skip to content

Commit a080b18

Browse files
authored
Merge pull request #15 from nasa/GESDISCUMU-5253-update-operational-scripts-for-gap-detection
brought operational scripts up to date with operations repo
2 parents b250358 + e0d4275 commit a080b18

File tree

4 files changed

+359
-13
lines changed

4 files changed

+359
-13
lines changed

gap_detection_operations/README.md

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
1-
Collections of around 1 million granules or more will need to be triggered from EC2, as they will surpass the API gateway timeout limit if ran from gapConfig API.
1+
Collections of around 1 million granules or more will need to be triggered from EC2, as they will surpass the API gateway timeout limit if ran from gapConfig API.
22

3-
There are two functions here, each with different uses:
3+
There are two folders here, each with different uses:
44

5-
invoke_gap_config.sh is used for single collections that are greater than ~1 million granules
5+
In the invoke_single_collection directory, the invoke_gap_config.sh script will run for a single collection. In the bulk_invoke directory, the lambda_bulk_invoker.py script will run for a list of collections.
66

7-
To run:
87

9-
1. Launch or use an existing EC2 instance in the same VPC as gapConfig API.
10-
2. Prepare the input file and script provided in this folder. The event.json file needs to be modified to run for your specified collection.
11-
3. Run the script: './invoke_gap_config.sh'
12-
4. Check the response: 'cat response.json'
8+
To run for a single collection:
139

14-
lambda_bulk_invoker.py is used for larger lists of collections and will process them sequentially.
10+
- Launch or use an existing EC2 instance in the same VPC as gapConfig API.
11+
- Prepare the input file and script provided in the invoke_single_collection folder. The event.json file needs to be modified to run for your specified collection.
12+
Usage: './invoke_gap_config.sh'
13+
Check the response: 'cat response.json'
1514

16-
To run:
17-
1. Create a collections.csv with first column collection ID and second column version. Third column for tolerance is optional
18-
2. The lambda name for gapConfig and the csv file are specified as command line arguments.
19-
2. EXAMPLE RUN: python3 lambda_bulk_invoker.py gapConfigLambdaName collections.csv
15+
16+
lambda_bulk_invoker.py is used for larger lists of collections and will process them sequentially.
17+
18+
To run a list of collections:
19+
20+
- Create lambda_bulk_invoker.py on the EC2 instance. Paste the code from this repository into that file.
21+
- The EC2 Instance should have sqs:GetQueueUrl and sqs:GetQueueAttributes permissions for the gapDetectionIngestQueue
22+
- The lambda name for gapConfig, the csv file, and the queue name are specified as command line arguments.
23+
Usage: python3 lambda_queue_batch_processor.py <lambda_function_name> <csv_file> <queue_name>
24+
Example: python3 lambda_queue_batch_processor.py gesdisc-cumulus-uat-gapConfig collections.csv gesdisc-cumulus-uat-gapDetectionIngestQueue
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
#!/usr/bin/env python3
2+
3+
import boto3
4+
import csv
5+
import json
6+
import sys
7+
import os
8+
import time
9+
import fcntl
10+
from datetime import datetime, timedelta
11+
from typing import Optional, List
12+
13+
def acquire_lock():
14+
"""
15+
Acquire an exclusive file lock to prevent multiple instances from running.
16+
17+
Returns:
18+
file object: The lock file handle (must be kept open to maintain lock)
19+
20+
Exits:
21+
If another instance is already running
22+
"""
23+
lock_file_path = '/tmp/lambda_processor.lock'
24+
25+
try:
26+
lock_file = open(lock_file_path, 'w')
27+
# Try to get exclusive, non-blocking lock
28+
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
29+
lock_file.write(f"{os.getpid()}\n")
30+
lock_file.flush()
31+
print(f"Acquired lock (PID: {os.getpid()})")
32+
return lock_file
33+
except (IOError, OSError) as e:
34+
print("Another instance is already running. Exiting.")
35+
print(f"Lock file: {lock_file_path}")
36+
sys.exit(1)
37+
38+
def release_lock(lock_file):
39+
"""
40+
Release the file lock.
41+
42+
Args:
43+
lock_file: The lock file handle returned by acquire_lock()
44+
"""
45+
if lock_file:
46+
try:
47+
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) # Unlock
48+
lock_file.close()
49+
print("Released lock")
50+
except Exception as e:
51+
print(f"Warning: Error releasing lock: {e}")
52+
53+
def check_sqs_message_count(sqs_client, queue_name: str) -> int:
54+
"""
55+
Check the total number of messages in an SQS queue
56+
57+
Returns:
58+
int: Total number of messages (visible + in-flight)
59+
"""
60+
try:
61+
# Get queue URL
62+
response = sqs_client.get_queue_url(QueueName=queue_name)
63+
queue_url = response['QueueUrl']
64+
65+
# Get queue attributes
66+
attributes = sqs_client.get_queue_attributes(
67+
QueueUrl=queue_url,
68+
AttributeNames=[
69+
'ApproximateNumberOfMessages',
70+
'ApproximateNumberOfMessagesNotVisible'
71+
]
72+
)
73+
74+
# Extract message counts
75+
visible_messages = int(attributes['Attributes'].get('ApproximateNumberOfMessages', 0))
76+
in_flight_messages = int(attributes['Attributes'].get('ApproximateNumberOfMessagesNotVisible', 0))
77+
total_messages = visible_messages + in_flight_messages
78+
79+
return total_messages
80+
81+
except Exception as e:
82+
print(f"Error checking queue '{queue_name}': {e}")
83+
raise e
84+
85+
def is_queue_empty(sqs_client, queue_name: str) -> bool:
86+
"""
87+
Check if the SQS queue is empty.
88+
Wait indefinitely until queue is empty.
89+
90+
Args:
91+
sqs_client: boto3 SQS client
92+
queue_name: SQS queue name
93+
94+
Returns:
95+
bool: Always returns True when queue is finally empty
96+
"""
97+
start_time = time.time()
98+
99+
print(f"Starting queue monitoring")
100+
101+
while True:
102+
try:
103+
total_messages = check_sqs_message_count(sqs_client, queue_name)
104+
105+
print(f"Queue status - Total messages: {total_messages}")
106+
107+
if total_messages == 0:
108+
print("Queue is empty, proceeding with next invocation")
109+
return True
110+
111+
elapsed_minutes = int((time.time() - start_time) / 60)
112+
elapsed_seconds = int(time.time() - start_time) % 60
113+
print(f"Queue not empty ({total_messages} messages), waiting... ({elapsed_minutes}m {elapsed_seconds}s elapsed)")
114+
115+
time.sleep(10)
116+
117+
except Exception as e:
118+
print(f"Error checking queue: {str(e)}")
119+
time.sleep(10)
120+
121+
def invoke_lambda_for_collection(lambda_client, sqs_client, function_name: str, short_name: str, version: str, queue_name: str, tolerance: Optional[int] = None, response_dir: str = "responses"):
122+
"""
123+
Synchronously invoke the Lambda function for a single collection.
124+
"""
125+
collection = {
126+
"short_name": short_name,
127+
"version": version
128+
}
129+
130+
if tolerance is not None:
131+
collection["tolerance"] = tolerance
132+
133+
event_payload = {
134+
"httpMethod": "POST",
135+
"path": "/init",
136+
"body": json.dumps({
137+
"collections": [collection],
138+
"backfill": "force"
139+
})
140+
}
141+
142+
tolerance_str = f" (tolerance: {tolerance})" if tolerance is not None else ""
143+
print(f"Processing: {short_name} v{version}{tolerance_str}")
144+
145+
try:
146+
response = lambda_client.invoke(
147+
FunctionName=function_name,
148+
InvocationType='Event', # Asynchronous invocation
149+
Payload=json.dumps(event_payload)
150+
)
151+
152+
# For async invocation, expect 202
153+
if response['StatusCode'] == 202:
154+
print(f"Lambda invoked asynchronously: {short_name} v{version}")
155+
156+
# Wait for Lambda to spawn processes and populate the queue
157+
initial_wait = 30 # seconds
158+
print(f"Waiting {initial_wait} seconds for Lambda to populate queue...")
159+
time.sleep(initial_wait)
160+
161+
# Now wait for queue to be empty before proceeding
162+
print("Monitoring queue until empty before next invocation...")
163+
is_queue_empty(sqs_client, queue_name)
164+
165+
# Save response
166+
response_filename = os.path.join(response_dir, f"response_{short_name}_{version}.json")
167+
with open(response_filename, 'w') as f:
168+
json.dump({
169+
"async_invocation": True,
170+
"status_code": response['StatusCode'],
171+
"invoked_at": datetime.now().isoformat()
172+
}, f, indent=2)
173+
174+
return True, short_name, version, None
175+
else:
176+
error_msg = f"Lambda async invocation failed with status: {response['StatusCode']}"
177+
print(f"Failed: {short_name} v{version} - {error_msg}")
178+
return False, short_name, version, error_msg
179+
180+
except Exception as e:
181+
error_msg = f"Error invoking Lambda: {str(e)}"
182+
print(f"Exception: {short_name} v{version} - {error_msg}")
183+
return False, short_name, version, error_msg
184+
185+
def process_csv_sequential(csv_file: str, function_name: str, queue_name: str):
186+
"""
187+
Process CSV file and invoke Lambda functions
188+
"""
189+
collections = []
190+
191+
# Read CSV file
192+
try:
193+
with open(csv_file, 'r', newline='') as csvfile:
194+
reader = csv.reader(csvfile)
195+
196+
first_row = next(reader, None)
197+
if first_row and first_row[0].lower() in ['short_name', 'shortname', 'name']:
198+
print("Skipping header row...")
199+
else:
200+
csvfile.seek(0)
201+
reader = csv.reader(csvfile)
202+
203+
for row_num, row in enumerate(reader, start=1):
204+
if not row or len(row) < 2:
205+
print(f"Skipping row {row_num}: insufficient data")
206+
continue
207+
208+
short_name = row[0].strip()
209+
version = row[1].strip()
210+
211+
tolerance = None
212+
if len(row) >= 3 and row[2].strip():
213+
try:
214+
tolerance = int(row[2].strip())
215+
except ValueError:
216+
print(f"Warning: Invalid tolerance value '{row[2]}' in row {row_num}")
217+
218+
collections.append((short_name, version, tolerance))
219+
220+
except Exception as e:
221+
print(f"Error processing CSV file: {str(e)}")
222+
return False
223+
224+
if not collections:
225+
print("No valid collections found in CSV file")
226+
return False
227+
228+
# Create response directory
229+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
230+
response_dir = f"responses_{timestamp}"
231+
os.makedirs(response_dir, exist_ok=True)
232+
print(f"Created response directory: {response_dir}")
233+
234+
print(f"Found {len(collections)} collections to process. Processing...")
235+
236+
# Create AWS clients with increased timeout
237+
config = boto3.session.Config(
238+
read_timeout=900, # 15 minutes
239+
connect_timeout=60,
240+
retries={'max_attempts': 3}
241+
)
242+
243+
lambda_client = boto3.client('lambda', region_name='us-west-2', config=config)
244+
sqs_client = boto3.client('sqs', region_name='us-west-2', config=config)
245+
246+
successful_invocations = 0
247+
failed_invocations = 0
248+
249+
# Process each collection sequentially
250+
for i, (short_name, version, tolerance) in enumerate(collections, 1):
251+
print(f"\n[{i}/{len(collections)}] ", end="")
252+
253+
success, _, _, error_msg = invoke_lambda_for_collection(
254+
lambda_client, sqs_client, function_name, short_name, version, queue_name, tolerance, response_dir
255+
)
256+
257+
if success:
258+
successful_invocations += 1
259+
else:
260+
failed_invocations += 1
261+
262+
return successful_invocations, failed_invocations, response_dir
263+
264+
def main():
265+
# Acquire lock before doing anything else
266+
lock_file = acquire_lock()
267+
268+
try:
269+
if len(sys.argv) != 4:
270+
print("Usage: python3 lambda_bulk_invoker.py <lambda_function_name> <csv_file> <queue_name>")
271+
print("Example: python3 lambda_bulk_invoker.py gesdisc-cumulus-prod-gapConfig collections.csv gesdisc-cumulus-prod-gapDetectionIngestQueue")
272+
sys.exit(1)
273+
274+
FUNCTION_NAME = sys.argv[1]
275+
CSV_FILE = sys.argv[2]
276+
QUEUE_NAME = sys.argv[3]
277+
278+
print(f"Using SQS Queue: {QUEUE_NAME}")
279+
print(f"Queue monitoring via direct SQS API calls")
280+
281+
if not os.path.isfile(CSV_FILE):
282+
print(f"Error: {CSV_FILE} not found.")
283+
print("Please create a CSV file with columns: short_name, version, tolerance (optional)")
284+
sys.exit(1)
285+
286+
result = process_csv_sequential(CSV_FILE, FUNCTION_NAME, QUEUE_NAME)
287+
288+
if result is False:
289+
sys.exit(1)
290+
291+
successful_invocations, failed_invocations, response_dir = result
292+
total_invocations = successful_invocations + failed_invocations
293+
294+
print(f"\n=== Summary ===")
295+
print(f"Response files saved to: {response_dir}/")
296+
print(f"Total invocations: {total_invocations}")
297+
print(f"Successful: {successful_invocations}")
298+
print(f"Failed: {failed_invocations}")
299+
300+
if failed_invocations > 0:
301+
print(f"\nNote: {failed_invocations} collections failed to process")
302+
sys.exit(1)
303+
else:
304+
print(f"\nAll {successful_invocations} collections processed successfully!")
305+
306+
finally:
307+
# Always release the lock, even if script fails
308+
release_lock(lock_file)
309+
310+
if __name__ == "__main__":
311+
main()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"httpMethod": "POST",
3+
"path": "/init",
4+
"body": "{\"collections\":[{\"short_name\":\"short_name\",\"version\":\"00\",\"tolerance\":12}]}"
5+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/bin/bash
2+
3+
# Exit immediately if a command exits with a non-zero status
4+
set -e
5+
6+
# Variables
7+
FUNCTION_NAME="gesdisc-cumulus-prod-gapConfig"
8+
EVENT_FILE="event.json"
9+
RESPONSE_FILE="response.json"
10+
11+
# Check if event.json exists
12+
if [[ ! -f "$EVENT_FILE" ]]; then
13+
echo "Error: $EVENT_FILE not found."
14+
exit 1
15+
fi
16+
17+
echo "Invoking Lambda function: $FUNCTION_NAME"
18+
aws lambda invoke \
19+
--function-name "$FUNCTION_NAME" \
20+
--payload file://"$EVENT_FILE" \
21+
"$RESPONSE_FILE" \
22+
--cli-read-timeout 0 \
23+
--cli-connect-timeout 0
24+
25+
echo "Invocation complete. Response saved to $RESPONSE_FILE"

0 commit comments

Comments
 (0)