Skip to content

Commit cad50a1

Browse files
committed
feat: add exposure
1 parent c12284f commit cad50a1

File tree

16 files changed

+519
-4
lines changed

16 files changed

+519
-4
lines changed

src/amplitude_experiment/assignment/assignment.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99

1010
class Assignment:
11+
"""
12+
@deprecated Assignment tracking is deprecated. Use Exposure with ExposureService instead.
13+
"""
1114

1215
def __init__(self, user: User, results: Dict[str, Variant]):
1316
self.user = user

src/amplitude_experiment/assignment/assignment_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33

44
class AssignmentConfig(amplitude.Config):
5+
"""
6+
@deprecated Assignment tracking is deprecated. Use ExposureConfig with ExposureService instead.
7+
"""
58
def __init__(self, cache_capacity: int = 65536, send_evaluated_props: bool = False, **kw):
69
super(AssignmentConfig, self).__init__(**kw)
710
self.cache_capacity = cache_capacity

src/amplitude_experiment/assignment/assignment_filter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55

66
class AssignmentFilter:
7+
"""
8+
@deprecated Assignment tracking is deprecated. Use ExposureFilter with ExposureService instead.
9+
"""
710
def __init__(self, size: int, ttl_millis: int = DAY_MILLIS):
811
self.cache = Cache(size, ttl_millis)
912

src/amplitude_experiment/assignment/assignment_service.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010

1111
def to_event(assignment: Assignment, send_evaluated_props: bool) -> BaseEvent:
12+
"""
13+
@deprecated Assignment tracking is deprecated. Use Exposure tracking.
14+
"""
1215
event = BaseEvent(event_type='[Experiment] Assignment', user_id=assignment.user.user_id,
1316
device_id=assignment.user.device_id, event_properties={}, user_properties={})
1417

@@ -58,6 +61,9 @@ def to_event(assignment: Assignment, send_evaluated_props: bool) -> BaseEvent:
5861

5962

6063
class AssignmentService:
64+
"""
65+
@deprecated Assignment tracking is deprecated. Use ExposureService with Exposure tracking instead.
66+
"""
6167
def __init__(self, amplitude: Amplitude, assignment_filter: AssignmentFilter, send_evaluated_props: bool):
6268
self.amplitude = amplitude
6369
self.assignmentFilter = assignment_filter
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .exposure import Exposure, DAY_MILLIS
2+
from .exposure_filter import ExposureFilter
3+
from .exposure_service import ExposureService, to_exposure_events
4+
from .exposure_config import ExposureConfig
5+
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import time
2+
from typing import Dict
3+
4+
from .. import Variant
5+
from ..user import User
6+
7+
DAY_MILLIS = 24 * 60 * 60 * 1000
8+
9+
10+
class Exposure:
11+
"""
12+
Exposure is a class that represents a user's exposure to a set of flags.
13+
"""
14+
15+
def __init__(self, user: User, results: Dict[str, Variant]):
16+
self.user = user
17+
self.results = results
18+
self.timestamp = time.time() * 1000
19+
20+
def canonicalize(self) -> str:
21+
user = self.user.user_id.strip() if self.user.user_id else 'None'
22+
device = self.user.device_id.strip() if self.user.device_id else 'None'
23+
canonical = user + ' ' + device + ' '
24+
for flag_key in sorted(self.results):
25+
variant = self.results[flag_key]
26+
if variant.key is None:
27+
continue
28+
value = self.results[flag_key].key.strip()
29+
canonical += flag_key.strip() + ' ' + value + ' '
30+
return canonical
31+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import amplitude
2+
3+
4+
class ExposureConfig(amplitude.Config):
5+
def __init__(self, cache_capacity: int = 65536, **kw):
6+
super(ExposureConfig, self).__init__(**kw)
7+
self.cache_capacity = cache_capacity
8+
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from .exposure import Exposure
2+
from .exposure import DAY_MILLIS
3+
from ..util.cache import Cache
4+
5+
6+
class ExposureFilter:
7+
def __init__(self, size: int, ttl_millis: int = DAY_MILLIS):
8+
self.cache = Cache(size, ttl_millis)
9+
self.ttl_millis = ttl_millis
10+
11+
def should_track(self, exposure: Exposure) -> bool:
12+
"""
13+
Determines if an exposure should be tracked based on the dedupe filter.
14+
"""
15+
if not exposure.results:
16+
# Don't track empty exposures.
17+
return False
18+
canonical_exposure = exposure.canonicalize()
19+
track = self.cache.get(canonical_exposure) is None
20+
if track:
21+
self.cache.put(canonical_exposure, object())
22+
return track
23+
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
from amplitude import Amplitude, BaseEvent
2+
from .exposure import Exposure
3+
from .exposure import DAY_MILLIS
4+
from .exposure_filter import ExposureFilter
5+
from ..util import hash_code
6+
7+
FLAG_TYPE_MUTUAL_EXCLUSION_GROUP = "mutual-exclusion-group"
8+
9+
10+
def to_exposure_events(exposure: Exposure, ttl_millis: int) -> list[BaseEvent]:
11+
"""
12+
Convert an Exposure to a list of Amplitude events (one per flag).
13+
"""
14+
events = []
15+
canonicalized = exposure.canonicalize()
16+
for flag_key in exposure.results:
17+
variant = exposure.results[flag_key]
18+
19+
# TODO: We don't seem to use trackExposure metadata.
20+
track_exposure = variant.metadata.get('trackExposure') if variant.metadata is not None else True
21+
if track_exposure is False:
22+
continue
23+
24+
is_default = variant.metadata.get('default') if variant.metadata is not None else False
25+
if is_default:
26+
continue
27+
28+
# Determine user properties to set and unset.
29+
set_props = {}
30+
unset_props = {}
31+
flag_type = variant.metadata.get('flagType') if variant.metadata is not None else None
32+
if flag_type != FLAG_TYPE_MUTUAL_EXCLUSION_GROUP:
33+
if is_default:
34+
unset_props[f'[Experiment] {flag_key}'] = '-'
35+
else:
36+
if variant.key:
37+
set_props[f'[Experiment] {flag_key}'] = variant.key
38+
elif variant.value:
39+
set_props[f'[Experiment] {flag_key}'] = variant.value
40+
41+
# Build event properties.
42+
event_properties = {}
43+
if not is_default:
44+
event_properties['[Experiment] Flag Key'] = flag_key
45+
if variant.key:
46+
event_properties['[Experiment] Variant'] = variant.key
47+
elif variant.value:
48+
event_properties['[Experiment] Variant'] = variant.value
49+
if variant.metadata:
50+
event_properties['metadata'] = variant.metadata
51+
52+
# Build event.
53+
event = BaseEvent(
54+
event_type='[Experiment] Exposure',
55+
user_id=exposure.user.user_id,
56+
device_id=exposure.user.device_id,
57+
event_properties=event_properties,
58+
user_properties={
59+
'$set': set_props,
60+
'$unset': unset_props,
61+
},
62+
insert_id=f'{exposure.user.user_id} {exposure.user.device_id} {hash_code(flag_key + " " + canonicalized)} {int(exposure.timestamp / ttl_millis)}'
63+
)
64+
if exposure.user.groups:
65+
event.groups = exposure.user.groups
66+
67+
events.append(event)
68+
69+
return events
70+
71+
72+
class ExposureService:
73+
def __init__(self, amplitude: Amplitude, exposure_filter: ExposureFilter):
74+
self.amplitude = amplitude
75+
self.exposure_filter = exposure_filter
76+
77+
def track(self, exposure: Exposure):
78+
if self.exposure_filter.should_track(exposure):
79+
events = to_exposure_events(exposure, self.exposure_filter.ttl_millis)
80+
for event in events:
81+
self.amplitude.track(event)
82+

src/amplitude_experiment/local/client.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from amplitude import Amplitude
66

77
from .config import LocalEvaluationConfig
8+
from .evaluate_options import EvaluateOptions
89
from ..assignment import Assignment, AssignmentFilter, AssignmentService
10+
from ..exposure import Exposure, ExposureFilter, ExposureService
911
from ..cohort.cohort import USER_GROUP_TYPE
1012
from ..cohort.cohort_download_api import DirectCohortDownloadApi
1113
from ..cohort.cohort_loader import CohortLoader
@@ -47,6 +49,13 @@ def __init__(self, api_key: str, config: LocalEvaluationConfig = None):
4749
instance = Amplitude(config.assignment_config.api_key, config.assignment_config)
4850
self.assignment_service = AssignmentService(instance, AssignmentFilter(
4951
config.assignment_config.cache_capacity), config.assignment_config.send_evaluated_props)
52+
53+
# Exposure service is always instantiated, using deployment key if no api key provided
54+
exposure_config = self.config.exposure_config
55+
if not exposure_config.api_key:
56+
exposure_config.api_key = api_key
57+
exposure_instance = Amplitude(exposure_config.api_key, exposure_config)
58+
self.exposure_service = ExposureService(exposure_instance, ExposureFilter(exposure_config.cache_capacity))
5059
self.logger = logging.getLogger("Amplitude")
5160
self.logger.addHandler(logging.StreamHandler())
5261
if self.config.debug:
@@ -81,7 +90,8 @@ def start(self):
8190
"""
8291
self.deployment_runner.start()
8392

84-
def evaluate_v2(self, user: User, flag_keys: Set[str] = None) -> Dict[str, Variant]:
93+
# TODO: python backwards compatibility for evaluate_v2 to be looked at again
94+
def evaluate_v2(self, user: User, flag_keys: Set[str] = None, options: EvaluateOptions = None) -> Dict[str, Variant]:
8595
"""
8696
Locally evaluates flag variants for a user.
8797
@@ -91,16 +101,23 @@ def evaluate_v2(self, user: User, flag_keys: Set[str] = None) -> Dict[str, Varia
91101
92102
Parameters:
93103
user (User): The user to evaluate
94-
flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags are evaluated.
104+
flag_keys (Set[str]): The flags to evaluate with the user. If empty, all flags are evaluated.
105+
options (EvaluateOptions): Optional evaluation options.
95106
96107
Returns:
97108
The evaluated variants.
98109
"""
110+
# Handle backwards compatibility: if options is None, create default
111+
if options is None:
112+
options = EvaluateOptions(flag_keys=flag_keys)
113+
# Use flag_keys from options if provided, otherwise fall back to parameter
114+
flag_keys_to_use = options.flag_keys if options.flag_keys is not None else flag_keys
115+
99116
flag_configs = self.flag_config_storage.get_flag_configs()
100117
if flag_configs is None or len(flag_configs) == 0:
101118
return {}
102119
self.logger.debug(f"[Experiment] Evaluate: user={user} - Flags: {flag_configs}")
103-
sorted_flags = topological_sort(flag_configs, flag_keys and list(flag_keys))
120+
sorted_flags = topological_sort(flag_configs, flag_keys_to_use and list(flag_keys_to_use))
104121
if not sorted_flags:
105122
return {}
106123

@@ -120,7 +137,10 @@ def evaluate_v2(self, user: User, flag_keys: Set[str] = None) -> Dict[str, Varia
120137
) for k, v in result.items()
121138
}
122139
self.logger.debug(f"[Experiment] Evaluate Result: {variants}")
140+
if options.tracks_exposure is True:
141+
self.exposure_service.track(Exposure(user, variants))
123142
if self.assignment_service is not None:
143+
# @deprecated Assignment tracking is deprecated. Use ExposureService with Exposure tracking instead.
124144
self.assignment_service.track(Assignment(user, variants))
125145
return variants
126146

0 commit comments

Comments
 (0)