-
Notifications
You must be signed in to change notification settings - Fork 4
updated to use correct comparison for distinctUntilChanged #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
d1b1fc6
4ca4524
7411967
bc9651b
dc7b326
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,13 @@ import io.getunleash.android.errors.ServerException | |
| import io.getunleash.android.events.HeartbeatEvent | ||
| import io.getunleash.android.http.Throttler | ||
| import io.getunleash.android.unleashScope | ||
| import java.io.Closeable | ||
| import java.io.IOException | ||
| import java.util.concurrent.TimeUnit | ||
| import java.util.concurrent.atomic.AtomicReference | ||
| import kotlin.coroutines.CoroutineContext | ||
| import kotlin.coroutines.resume | ||
| import kotlin.coroutines.resumeWithException | ||
| import kotlinx.coroutines.Dispatchers | ||
| import kotlinx.coroutines.channels.BufferOverflow | ||
| import kotlinx.coroutines.flow.MutableSharedFlow | ||
|
|
@@ -30,54 +37,55 @@ import okhttp3.OkHttpClient | |
| import okhttp3.Request | ||
| import okhttp3.Response | ||
| import okhttp3.internal.closeQuietly | ||
| import java.io.Closeable | ||
| import java.io.IOException | ||
| import java.util.concurrent.TimeUnit | ||
| import java.util.concurrent.atomic.AtomicReference | ||
| import kotlin.coroutines.CoroutineContext | ||
| import kotlin.coroutines.resume | ||
| import kotlin.coroutines.resumeWithException | ||
|
|
||
| /** | ||
| * Http Client for fetching data from Unleash Proxy. | ||
| * By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs | ||
| * @param httpClient - the http client to use for fetching toggles from Unleash proxy | ||
| * Http Client for fetching data from Unleash Proxy. By default creates an OkHttpClient with | ||
| * readTimeout set to 2 seconds and a cache of 10 MBs | ||
| * @param httpClient | ||
| * - the http client to use for fetching toggles from Unleash proxy | ||
| */ | ||
| open class UnleashFetcher( | ||
| unleashConfig: UnleashConfig, | ||
| private val httpClient: OkHttpClient, | ||
| private val unleashContext: StateFlow<UnleashContext>, | ||
| unleashConfig: UnleashConfig, | ||
| private val httpClient: OkHttpClient, | ||
| private val unleashContext: StateFlow<UnleashContext>, | ||
| ) : Closeable { | ||
| companion object { | ||
| private const val TAG = "UnleashFetcher" | ||
| } | ||
|
|
||
| private var contextForLastFetch: UnleashContext? = null | ||
| private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl() | ||
| private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy) | ||
| private val applicationHeaders = | ||
| unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy) | ||
| private val appName = unleashConfig.appName | ||
| private var etag: String? = null | ||
| private val featuresReceivedFlow = MutableSharedFlow<UnleashState>( | ||
| replay = 1, | ||
| onBufferOverflow = BufferOverflow.DROP_OLDEST | ||
| ) | ||
| private val fetcherHeartbeatFlow = MutableSharedFlow<HeartbeatEvent>( | ||
| extraBufferCapacity = 5, | ||
| onBufferOverflow = BufferOverflow.DROP_OLDEST | ||
| ) | ||
| private val featuresReceivedFlow = | ||
| MutableSharedFlow<UnleashState>( | ||
| replay = 1, | ||
| onBufferOverflow = BufferOverflow.DROP_OLDEST | ||
| ) | ||
| private val fetcherHeartbeatFlow = | ||
| MutableSharedFlow<HeartbeatEvent>( | ||
| extraBufferCapacity = 5, | ||
| onBufferOverflow = BufferOverflow.DROP_OLDEST | ||
| ) | ||
| private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO | ||
| private val currentCall = AtomicReference<Call?>(null) | ||
| private val throttler = | ||
| Throttler( | ||
| TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval), | ||
| longestAcceptableIntervalSeconds = 300, | ||
| proxyUrl.toString() | ||
| ) | ||
| Throttler( | ||
| TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval), | ||
| longestAcceptableIntervalSeconds = 300, | ||
| proxyUrl.toString() | ||
| ) | ||
|
|
||
| fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow() | ||
|
|
||
| fun startWatchingContext() { | ||
| unleashScope.launch { | ||
| unleashContext.distinctUntilChanged { old, new -> old != new }.collect { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| unleashContext.collect { | ||
| if (it == contextForLastFetch) { | ||
| Log.d(TAG, "Context unchanged, skipping refresh toggles") | ||
| return@collect | ||
| } | ||
| withContext(coroutineContextForContextChange) { | ||
| Log.d(TAG, "Unleash context changed: $it") | ||
| refreshToggles() | ||
|
|
@@ -89,7 +97,7 @@ open class UnleashFetcher( | |
| suspend fun refreshToggles(): ToggleResponse { | ||
| if (throttler.performAction()) { | ||
| Log.d(TAG, "Refreshing toggles") | ||
| val response = refreshTogglesWithContext(unleashContext.value) | ||
| val response = doFetchToggles(unleashContext.value) | ||
| fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message)) | ||
| return response | ||
| } | ||
|
|
@@ -98,15 +106,28 @@ open class UnleashFetcher( | |
| return ToggleResponse(Status.THROTTLED) | ||
| } | ||
|
|
||
| internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse { | ||
| suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse { | ||
| if (throttler.performAction()) { | ||
| Log.d(TAG, "Refreshing toggles") | ||
| val response = doFetchToggles(ctx) | ||
| fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message)) | ||
| return response | ||
| } | ||
| Log.i(TAG, "Skipping refresh toggles due to throttling") | ||
| fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED)) | ||
| return ToggleResponse(Status.THROTTLED) | ||
| } | ||
|
|
||
| internal suspend fun doFetchToggles(ctx: UnleashContext): ToggleResponse { | ||
| contextForLastFetch = ctx | ||
| val response = fetchToggles(ctx) | ||
| if (response.isSuccess()) { | ||
|
|
||
| val toggles = response.config!!.toggles.groupBy { it.name } | ||
| .mapValues { (_, v) -> v.first() } | ||
| val toggles = | ||
| response.config!!.toggles.groupBy { it.name }.mapValues { (_, v) -> v.first() } | ||
| Log.d( | ||
| TAG, | ||
| "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow" | ||
| TAG, | ||
| "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow" | ||
| ) | ||
| featuresReceivedFlow.emit(UnleashState(ctx, toggles)) | ||
| return ToggleResponse(response.status, toggles) | ||
|
|
@@ -124,26 +145,31 @@ open class UnleashFetcher( | |
|
|
||
| private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse { | ||
| if (proxyUrl == null) { | ||
| return FetchResponse(Status.FAILED, error = IllegalStateException("Proxy URL is not set")) | ||
| return FetchResponse( | ||
| Status.FAILED, | ||
| error = IllegalStateException("Proxy URL is not set") | ||
| ) | ||
| } | ||
| val contextUrl = buildContextUrl(ctx) | ||
| try { | ||
| val request = Request.Builder().url(contextUrl) | ||
| .headers(applicationHeaders.toHeaders()) | ||
| val request = Request.Builder().url(contextUrl).headers(applicationHeaders.toHeaders()) | ||
| if (etag != null) { | ||
| request.header("If-None-Match", etag!!) | ||
| } | ||
| val call = this.httpClient.newCall(request.build()) | ||
| val inFlightCall = currentCall.get() | ||
| if (!currentCall.compareAndSet(inFlightCall, call)) { | ||
| return FetchResponse( | ||
| Status.FAILED, | ||
| error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight") | ||
| Status.FAILED, | ||
| error = | ||
| IllegalStateException( | ||
| "Failed to set new call while ${inFlightCall?.request()?.url} is in flight" | ||
| ) | ||
| ) | ||
| } else if (inFlightCall != null && !inFlightCall.isCanceled()) { | ||
| } else if (inFlightCall != null && !inFlightCall.isCanceled() && !inFlightCall.isExecuted()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not cancel completed requests |
||
| Log.d( | ||
| TAG, | ||
| "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}" | ||
| TAG, | ||
| "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}" | ||
| ) | ||
| inFlightCall.cancel() | ||
| } | ||
|
|
@@ -159,23 +185,21 @@ open class UnleashFetcher( | |
| res.body?.use { b -> | ||
| try { | ||
| val proxyResponse: ProxyResponse = | ||
| proxyResponseAdapter.fromJson(b.string())!! | ||
| proxyResponseAdapter.fromJson(b.string())!! | ||
| FetchResponse(Status.SUCCESS, proxyResponse) | ||
| } catch (e: Exception) { | ||
| // If we fail to parse, just keep data | ||
| FetchResponse(Status.FAILED, error = e) | ||
| } | ||
| } ?: FetchResponse(Status.FAILED, error = NoBodyException()) | ||
| } | ||
| ?: FetchResponse(Status.FAILED, error = NoBodyException()) | ||
| } | ||
|
|
||
| res.code == 304 -> { | ||
| FetchResponse(Status.NOT_MODIFIED) | ||
| } | ||
|
|
||
| res.code == 401 -> { | ||
| FetchResponse(Status.FAILED, error = NotAuthorizedException()) | ||
| } | ||
|
|
||
| else -> { | ||
| FetchResponse(Status.FAILED, error = ServerException(res.code)) | ||
| } | ||
|
|
@@ -188,31 +212,33 @@ open class UnleashFetcher( | |
|
|
||
| private suspend fun Call.await(): Response { | ||
| return suspendCancellableCoroutine { continuation -> | ||
| enqueue(object : Callback { | ||
| override fun onResponse(call: Call, response: Response) { | ||
| continuation.resume(response) | ||
| } | ||
| enqueue( | ||
| object : Callback { | ||
| override fun onResponse(call: Call, response: Response) { | ||
| continuation.resume(response) | ||
| } | ||
|
|
||
| override fun onFailure(call: Call, e: IOException) { | ||
| // Don't bother with resuming the continuation if it is already cancelled. | ||
| if (continuation.isCancelled) return | ||
| continuation.resumeWithException(e) | ||
| } | ||
| }) | ||
| override fun onFailure(call: Call, e: IOException) { | ||
| // Don't bother with resuming the continuation if it is already | ||
| // cancelled. | ||
| if (continuation.isCancelled) return | ||
| continuation.resumeWithException(e) | ||
| } | ||
| } | ||
| ) | ||
|
|
||
| continuation.invokeOnCancellation { | ||
| try { | ||
| cancel() | ||
| } catch (ex: Throwable) { | ||
| //Ignore cancel exception | ||
| // Ignore cancel exception | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private fun buildContextUrl(ctx: UnleashContext): HttpUrl { | ||
| var contextUrl = proxyUrl!!.newBuilder() | ||
| .addQueryParameter("appName", appName) | ||
| var contextUrl = proxyUrl!!.newBuilder().addQueryParameter("appName", appName) | ||
| if (ctx.userId != null) { | ||
| contextUrl.addQueryParameter("userId", ctx.userId) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hold the last fetched so we can ignore refreshing toggles when context has change due to a sync setContext which avoids 2 fetch calls for the same context