Skip to content

Commit 00e2eb5

Browse files
committed
refactor: Improve path handling and error messages in ShadowTrackrConnector
- Replaced string manipulation with `Path` for constructing the config file path. - Enhanced error messages for label creation failures and IP processing. - Simplified score adjustment logic for false positive estimates. - Streamlined label handling and description updates for STIX entities. - Improved exception handling for IP validation.
1 parent f628665 commit 00e2eb5

File tree

1 file changed

+93
-160
lines changed

1 file changed

+93
-160
lines changed

internal-enrichment/shadowtrackr/src/shadowtrackr.py

Lines changed: 93 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import os
44
from datetime import datetime, timedelta
5-
from typing import Dict
5+
from pathlib import Path
66

77
import requests
88
import yaml
@@ -18,7 +18,7 @@ class ShadowTrackrConnector:
1818
def __init__(self):
1919
# Instantiate the connector helper from config
2020

21-
config_file_path = os.path.dirname(os.path.abspath(__file__)) + "/config.yml"
21+
config_file_path = str((Path(__file__).resolve().parent / "config.yml"))
2222
config = (
2323
yaml.load(open(config_file_path), Loader=yaml.FullLoader)
2424
if os.path.isfile(config_file_path)
@@ -71,17 +71,19 @@ def __init__(self):
7171
)
7272
if self.label_bogon is None:
7373
raise ValueError(
74-
"The Bogon label could not be created. If your connector does not have the permission to create labels,"
75-
" please create it manually before launching"
74+
"The Bogon label could not be created. If your connector does not have the permission to create labels, "
75+
"please create it manually before launching"
7676
)
77+
7778
self.label_cloud = self.helper.api.label.read_or_create_unchecked(
7879
value="cloud", color="#145578"
7980
)
8081
if self.label_cloud is None:
8182
raise ValueError(
82-
"The cloud label could not be created. If your connector does not have the permission to create labels,"
83-
" please create it manually before launching"
83+
"The cloud label could not be created. If your connector does not have the permission to create labels, "
84+
"please create it manually before launching."
8485
)
86+
8587
self.label_cdn = self.helper.api.label.read_or_create_unchecked(
8688
value="cdn", color="#145578"
8789
)
@@ -90,6 +92,7 @@ def __init__(self):
9092
"The cdn label could not be created. If your connector does not have the permission to create labels, "
9193
"please create it manually before launching"
9294
)
95+
9396
self.label_vpn = self.helper.api.label.read_or_create_unchecked(
9497
value="vpn", color="#145578"
9598
)
@@ -98,6 +101,7 @@ def __init__(self):
98101
"The vpn label could not be created. If your connector does not have the permission to create labels, "
99102
"please create it manually before launching"
100103
)
104+
101105
self.label_tor = self.helper.api.label.read_or_create_unchecked(
102106
value="tor", color="#145578"
103107
)
@@ -119,33 +123,33 @@ def __init__(self):
119123
def _process_entity(self, stix_objects, stix_entity, opencti_entity) -> str:
120124
# Search in ShadowTrackr
121125
is_indicator = False
122-
if opencti_entity["entity_type"] == "Indicator":
126+
entity_type = opencti_entity["entity_type"]
127+
if entity_type == "Indicator":
128+
pattern = opencti_entity["pattern"]
123129
is_indicator = True
124-
if "ipv4-addr" in opencti_entity["pattern"]:
125-
ip = opencti_entity["pattern"].split("'")[1]
126-
elif "ipv6-addr" in opencti_entity["pattern"]:
127-
ip = opencti_entity["pattern"].split("'")[1]
130+
131+
if "ipv4-addr" in pattern or "ipv6-addr" in pattern:
132+
ip = pattern.split("'")[1]
128133
else:
129-
return (
130-
"No ip address in indicator, skipping " + opencti_entity["pattern"]
131-
)
134+
return f"No ip address in indicator, skipping {pattern} {entity_type}"
135+
132136
else:
133137
ip = opencti_entity["observable_value"]
134138

135139
if self._valid_ip(ip):
136140
score = opencti_entity["x_opencti_score"]
137141
old_score = score
138-
labels = [l["value"] for l in opencti_entity["objectLabel"]]
142+
labels = [label["value"] for label in opencti_entity["objectLabel"]]
139143
markings = [
140-
l["definition"]
141-
for l in opencti_entity["objectMarking"]
142-
if l["definition_type"] == "TLP"
144+
marking["definition"]
145+
for marking in opencti_entity["objectMarking"]
146+
if marking["definition_type"] == "TLP"
143147
]
144148

145149
for tlp in markings:
146150
if not OpenCTIConnectorHelper.check_max_tlp(tlp, self.max_tlp):
147151
raise ValueError(
148-
"Do not send any data, TLP of the observable is greater than MAX TLP"
152+
f"Do not send any data, TLP of the observable is greater than MAX TLP: {tlp} > {self.max_tlp}"
149153
)
150154

151155
if is_indicator:
@@ -155,28 +159,30 @@ def _process_entity(self, stix_objects, stix_entity, opencti_entity) -> str:
155159

156160
if description is not None and "[ShadowTrackr] " in description:
157161
return (
158-
"This ip is already processed by the ShadowTrackr connector. We're not doing it again,"
159-
" that might mess up the score"
162+
"This ip is already processed by the ShadowTrackr connector. We're not doing it again, "
163+
"that might mess up the score."
160164
)
161165

162166
data = self._check_ip_in_shadowtrackr(ip, labels)
163-
if "error" in data:
164-
raise Exception("Error: [ShadowTrackr] " + data["error"])
167+
if error := data.get("error"):
168+
raise ValueError(f"Error: [ShadowTrackr] {error}")
165169

166170
score_lowered = False
167171
if self.replace_with_lower_score:
168-
if data["false_positive_estimate"] > 99:
169-
score -= 60
170-
score_lowered = True
171-
elif data["false_positive_estimate"] > 89:
172-
score -= 40
173-
score_lowered = True
174-
elif data["false_positive_estimate"] > 69:
175-
score -= 20
176-
score_lowered = True
177-
elif data["false_positive_estimate"] > 50:
178-
score -= 10
179-
score_lowered = True
172+
false_positive_estimate = data["false_positive_estimate"]
173+
174+
score_steps = [
175+
(99, 60),
176+
(89, 40),
177+
(69, 20),
178+
(50, 10),
179+
]
180+
for threshold, decrement in score_steps:
181+
if false_positive_estimate > threshold:
182+
score -= decrement
183+
score_lowered = True
184+
break
185+
180186
# set a lower boundary
181187
if score < 10:
182188
score = 10
@@ -196,127 +202,70 @@ def _process_entity(self, stix_objects, stix_entity, opencti_entity) -> str:
196202
timespec="milliseconds"
197203
) + "Z"
198204
OpenCTIStix2.put_attribute_in_extension(
199-
stix_entity,
200-
STIX_EXT_OCTI_SCO,
201-
"valid_until",
202-
valid_until,
203-
True,
205+
stix_entity, STIX_EXT_OCTI_SCO, "valid_until", valid_until, True
204206
)
205207
date_shortened = True
206208

207-
if data["vpn"]:
208-
if is_indicator:
209-
stix_entity["labels"].append("vpn")
210-
else:
211-
OpenCTIStix2.put_attribute_in_extension(
212-
stix_entity,
213-
STIX_EXT_OCTI_SCO,
214-
"labels",
215-
"vpn",
216-
True,
217-
)
218-
if data["cdn"]:
219-
if is_indicator:
220-
stix_entity["labels"].append("cdn")
221-
else:
222-
OpenCTIStix2.put_attribute_in_extension(
223-
stix_entity,
224-
STIX_EXT_OCTI_SCO,
225-
"labels",
226-
"cdn",
227-
True,
228-
)
229-
if data["cloud"]:
230-
if is_indicator:
231-
stix_entity["labels"].append("cloud")
232-
else:
233-
OpenCTIStix2.put_attribute_in_extension(
234-
stix_entity,
235-
STIX_EXT_OCTI_SCO,
236-
"labels",
237-
"cloud",
238-
True,
239-
)
240-
if data["bogon"]:
241-
if is_indicator:
242-
stix_entity["labels"].append("bogon")
243-
else:
244-
OpenCTIStix2.put_attribute_in_extension(
245-
stix_entity,
246-
STIX_EXT_OCTI_SCO,
247-
"labels",
248-
"bogon",
249-
True,
250-
)
251-
if data["tor"]:
252-
if is_indicator:
253-
stix_entity["labels"].append("tor")
254-
else:
255-
OpenCTIStix2.put_attribute_in_extension(
256-
stix_entity,
257-
STIX_EXT_OCTI_SCO,
258-
"labels",
259-
"tor",
260-
True,
261-
)
262-
if data["public_dns"]:
263-
if is_indicator:
264-
stix_entity["labels"].append("public dns server")
265-
else:
266-
OpenCTIStix2.put_attribute_in_extension(
267-
stix_entity,
268-
STIX_EXT_OCTI_SCO,
269-
"labels",
270-
"public dns server",
271-
True,
272-
)
209+
for label in ["vpn", "cdn", "cloud", "bogon", "tor", "public_dns"]:
210+
if data[label]:
211+
if is_indicator:
212+
stix_entity["labels"].append(label)
213+
else:
214+
OpenCTIStix2.put_attribute_in_extension(
215+
stix_entity, STIX_EXT_OCTI_SCO, "labels", label, True
216+
)
273217

274218
text = ""
275219
if data["cdn"]:
276220
if data["cdn_provider"]:
277-
text = (
278-
"This is an ip address in the " + data["cdn_provider"] + " CDN."
279-
)
221+
text = f"This is an ip address in the {data['cdn_provider']} CDN."
280222
else:
281223
text = "This is an ip address in a CDN."
282-
text += " CDN ip addresses change regularly, and are not very useful to track. "
283-
if score_lowered and date_shortened:
284-
text += " The score is adjusted downwards, and the valid until date set to 1 day."
285-
elif score_lowered:
286-
text += " The score is adjusted downwards."
224+
225+
text += " CDN ip addresses change regularly, and are not very useful to track."
226+
if score_lowered:
227+
if date_shortened:
228+
text += " The score is adjusted downwards, and the valid until date set to 1 day."
229+
else:
230+
text += " The score is adjusted downwards."
287231

288232
elif data["cloud"]:
289233
if data["cloud_provider"]:
290234
text = (
291-
"This is an ip address in the "
292-
+ data["cloud_provider"]
293-
+ " cloud."
235+
f" This is an ip address in the {data['cloud_provider']} cloud."
294236
)
295237
else:
296-
text = "This is an ip address in a cloud."
297-
text += " Cloud ip addresses change regularly, and are not very useful to track. "
298-
if score_lowered and date_shortened:
299-
text += " The score is adjusted downwards, and the valid until date set to 1 day."
300-
elif score_lowered:
301-
text += " The score is adjusted downwards."
238+
text = " This is an ip address in a cloud."
239+
240+
text += " Cloud ip addresses change regularly, and are not very useful to track."
241+
if score_lowered:
242+
if date_shortened:
243+
text += " The score is adjusted downwards, and the valid until date set to 1 day."
244+
else:
245+
text += " The score is adjusted downwards."
302246

303247
elif data["vpn"]:
304-
text = "This ip address is a VPN."
305-
text += " VPN ip addresses change regularly, and are not very useful to track. "
306-
if score_lowered and date_shortened:
307-
text += " The score is adjusted downwards, and the valid until date set to 1 day."
308-
elif score_lowered:
309-
text += " The score is adjusted downwards."
248+
text = (
249+
"This ip address is a VPN. "
250+
"VPN ip addresses change regularly, and are not very useful to track."
251+
)
252+
if score_lowered:
253+
if date_shortened:
254+
text += " The score is adjusted downwards, and the valid until date set to 1 day."
255+
else:
256+
text += " The score is adjusted downwards."
310257

311258
elif data["public_dns"]:
312-
text = "This ip address is a public DNS server."
313-
text += " Public DNS servers are often used in malware to check for an internet connection, "
314-
text += "and automated analysis tools regularly extract them as indicators. This is not very useful."
259+
text = (
260+
"This ip address is a public DNS server. "
261+
"Public DNS servers are often used in malware to check for an internet connection, "
262+
"and automated analysis tools regularly extract them as indicators. This is not very useful."
263+
)
315264
if self.replace_with_lower_score:
316265
text += " The score is adjusted downwards."
317266

318267
if text:
319-
description += "\n[ShadowTrackr] " + text
268+
description += f"\n[ShadowTrackr] {text}"
320269
if is_indicator:
321270
stix_entity["description"] = description
322271
else:
@@ -332,39 +281,23 @@ def _process_entity(self, stix_objects, stix_entity, opencti_entity) -> str:
332281
if date_shortened:
333282
serialized_bundle = self.helper.stix2_create_bundle(stix_objects)
334283
self.helper.send_stix2_bundle(serialized_bundle)
335-
return (
336-
"Found data on "
337-
+ ip
338-
+ ". Score not changed, but valid_until shortened to 1 day."
339-
)
284+
return f"Found data on {ip}. Score not changed, but valid_until shortened to 1 day."
340285
else:
341-
return "Found data on " + ip + ". Score not changed."
286+
return f"Found data on {ip}. Score not changed."
342287
else:
343288
serialized_bundle = self.helper.stix2_create_bundle(stix_objects)
344289
self.helper.send_stix2_bundle(serialized_bundle)
345290
if date_shortened:
346291
return (
347-
"Found data on "
348-
+ ip
349-
+ ". Score changed from "
350-
+ str(old_score)
351-
+ " to "
352-
+ str(score)
353-
+ ", valid_until shortened to 1 day."
292+
f"Found data on {ip}. Score changed from {old_score} to {score}, "
293+
"valid_until shortened to 1 day."
354294
)
355295
else:
356-
return (
357-
"Found data on "
358-
+ ip
359-
+ ". Score changed from "
360-
+ str(old_score)
361-
+ " to "
362-
+ str(score)
363-
)
296+
return f"Found data on {ip}. Score changed from {old_score} to {score}."
364297

365-
return "Invalid ip: " + ip
298+
return f"Invalid ip: {ip}"
366299

367-
def _process_message(self, data: Dict) -> str:
300+
def _process_message(self, data: dict) -> str:
368301
stix_objects = data["stix_objects"]
369302
stix_entity = data["stix_entity"]
370303
opencti_entity = data["enrichment_entity"]
@@ -381,18 +314,18 @@ def _check_ip_in_shadowtrackr(self, ip, labels=None) -> dict:
381314
labels = ""
382315

383316
base_url = "https://shadowtrackr.com/api/v3/ip_info"
384-
full_url = (
385-
base_url + "?api_key=" + self.api_key + "&ip=" + ip + "&tags=" + labels
317+
response = requests.get(
318+
base_url, params={"api_key": self.api_key, "ip": ip, "tags": labels or ""}
386319
)
387-
response = requests.get(full_url)
388320
data = json.loads(response.text)
389321
return data
390322

391323
def _valid_ip(self, ip):
392324
try:
393325
ipaddress.ip_address(ip)
394326
return True
395-
except:
327+
except Exception as e:
328+
self.helper.log_error("Error validating ip", {"error": str(e)})
396329
return False
397330

398331

0 commit comments

Comments
 (0)