Skip to content

Commit 16cddff

Browse files
committed
feat: use common streaming utils, reverse sync to template
1 parent 33a8520 commit 16cddff

File tree

8 files changed

+586
-124
lines changed

8 files changed

+586
-124
lines changed

.openapi-generator/FILES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ docs/WriteAuthorizationModelResponse.md
9797
docs/WriteRequest.md
9898
docs/WriteRequestDeletes.md
9999
docs/WriteRequestWrites.md
100+
src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java
100101
src/main/java/dev/openfga/sdk/api/OpenFgaApi.java
102+
src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java
101103
src/main/java/dev/openfga/sdk/api/model/AbortedMessageResponse.java
102104
src/main/java/dev/openfga/sdk/api/model/AbstractOpenApiSchema.java
103105
src/main/java/dev/openfga/sdk/api/model/Any.java
@@ -157,6 +159,7 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java
157159
src/main/java/dev/openfga/sdk/api/model/SourceInfo.java
158160
src/main/java/dev/openfga/sdk/api/model/Status.java
159161
src/main/java/dev/openfga/sdk/api/model/Store.java
162+
src/main/java/dev/openfga/sdk/api/model/StreamResult.java
160163
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java
161164
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java
162165
src/main/java/dev/openfga/sdk/api/model/Tuple.java

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ It can be used with the following:
8282
* Gradle (Groovy)
8383

8484
```groovy
85-
implementation 'dev.openfga:openfga-sdk:0.9.3'
85+
implementation 'dev.openfga:openfga-sdk:0.9.2'
8686
```
8787

8888
* Gradle (Kotlin)
8989

9090
```kotlin
91-
implementation("dev.openfga:openfga-sdk:0.9.3")
91+
implementation("dev.openfga:openfga-sdk:0.9.2")
9292
```
9393

9494
* Apache Maven
@@ -97,26 +97,26 @@ implementation("dev.openfga:openfga-sdk:0.9.3")
9797
<dependency>
9898
<groupId>dev.openfga</groupId>
9999
<artifactId>openfga-sdk</artifactId>
100-
<version>0.9.3</version>
100+
<version>0.9.2</version>
101101
</dependency>
102102
```
103103

104104
* Ivy
105105

106106
```xml
107-
<dependency org="dev.openfga" name="openfga-sdk" rev="0.9.3"/>
107+
<dependency org="dev.openfga" name="openfga-sdk" rev="0.9.2"/>
108108
```
109109

110110
* SBT
111111

112112
```scala
113-
libraryDependencies += "dev.openfga" % "openfga-sdk" % "0.9.3"
113+
libraryDependencies += "dev.openfga" % "openfga-sdk" % "0.9.2"
114114
```
115115

116116
* Leiningen
117117

118118
```edn
119-
[dev.openfga/openfga-sdk "0.9.3"]
119+
[dev.openfga/openfga-sdk "0.9.2"]
120120
```
121121

122122

examples/streamed-list-objects/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ openfga_version=latest
66
language=java
77

88
build:
9-
./gradlew -P language=$(language) build
9+
../../gradlew -P language=$(language) build
1010

1111
run:
12-
./gradlew -P language=$(language) run
12+
../../gradlew -P language=$(language) run
1313

1414
run-openfga:
1515
docker pull docker.io/openfga/openfga:${openfga_version} && \
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* OpenFGA
3+
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
4+
*
5+
* The version of the OpenAPI document: 1.x
6+
* Contact: [email protected]
7+
*
8+
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
9+
* https://openapi-generator.tech
10+
* Do not edit the class manually.
11+
*/
12+
13+
package dev.openfga.sdk.api;
14+
15+
import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;
16+
17+
import com.fasterxml.jackson.core.type.TypeReference;
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import dev.openfga.sdk.api.client.ApiClient;
20+
import dev.openfga.sdk.api.configuration.Configuration;
21+
import dev.openfga.sdk.api.model.Status;
22+
import dev.openfga.sdk.api.model.StreamResult;
23+
import dev.openfga.sdk.errors.ApiException;
24+
import dev.openfga.sdk.errors.FgaInvalidParameterException;
25+
import java.net.http.HttpRequest;
26+
import java.net.http.HttpResponse;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.Consumer;
29+
import java.util.stream.Stream;
30+
31+
/**
32+
* Base class for handling streaming API responses.
33+
* This class provides generic streaming functionality that can be reused across
34+
* different streaming endpoints by handling the common streaming parsing and error handling logic.
35+
*
36+
* @param <T> The type of response objects in the stream
37+
*/
38+
public abstract class BaseStreamingApi<T> {
39+
protected final Configuration configuration;
40+
protected final ApiClient apiClient;
41+
protected final ObjectMapper objectMapper;
42+
protected final TypeReference<StreamResult<T>> streamResultTypeRef;
43+
44+
/**
45+
* Constructor for BaseStreamingApi
46+
*
47+
* @param configuration The API configuration
48+
* @param apiClient The API client for making HTTP requests
49+
* @param streamResultTypeRef TypeReference for deserializing StreamResult<T>
50+
*/
51+
protected BaseStreamingApi(
52+
Configuration configuration, ApiClient apiClient, TypeReference<StreamResult<T>> streamResultTypeRef) {
53+
this.configuration = configuration;
54+
this.apiClient = apiClient;
55+
this.objectMapper = apiClient.getObjectMapper();
56+
this.streamResultTypeRef = streamResultTypeRef;
57+
}
58+
59+
/**
60+
* Process a streaming response asynchronously.
61+
* Each line in the response is parsed and delivered to the consumer callback.
62+
*
63+
* @param request The HTTP request to execute
64+
* @param consumer Callback to handle each response object (invoked asynchronously)
65+
* @param errorConsumer Optional callback to handle errors during streaming
66+
* @return CompletableFuture<Void> that completes when streaming finishes
67+
*/
68+
protected CompletableFuture<Void> processStreamingResponse(
69+
HttpRequest request, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
70+
71+
// Use async HTTP client with streaming body handler
72+
// ofLines() provides line-by-line streaming
73+
return apiClient
74+
.getHttpClient()
75+
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
76+
.thenCompose(response -> {
77+
// Check response status
78+
int statusCode = response.statusCode();
79+
if (statusCode < 200 || statusCode >= 300) {
80+
ApiException apiException =
81+
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
82+
return CompletableFuture.failedFuture(apiException);
83+
}
84+
85+
// Process the stream - this runs on HttpClient's executor thread
86+
try (Stream<String> lines = response.body()) {
87+
lines.forEach(line -> {
88+
if (!isNullOrWhitespace(line)) {
89+
processLine(line, consumer, errorConsumer);
90+
}
91+
});
92+
return CompletableFuture.completedFuture((Void) null);
93+
} catch (Exception e) {
94+
return CompletableFuture.failedFuture(e);
95+
}
96+
})
97+
.handle((result, throwable) -> {
98+
if (throwable != null) {
99+
// Unwrap CompletionException to get the original exception
100+
Throwable actualException = throwable;
101+
if (throwable instanceof java.util.concurrent.CompletionException
102+
&& throwable.getCause() != null) {
103+
actualException = throwable.getCause();
104+
}
105+
106+
if (errorConsumer != null) {
107+
errorConsumer.accept(actualException);
108+
}
109+
// Re-throw to keep the CompletableFuture in failed state
110+
if (actualException instanceof RuntimeException) {
111+
throw (RuntimeException) actualException;
112+
}
113+
throw new RuntimeException(actualException);
114+
}
115+
return result;
116+
});
117+
}
118+
119+
/**
120+
* Process a single line from the stream
121+
*
122+
* @param line The JSON line to process
123+
* @param consumer Callback to handle the parsed result
124+
* @param errorConsumer Optional callback to handle errors
125+
*/
126+
private void processLine(String line, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
127+
try {
128+
// Parse the JSON line to extract the object
129+
StreamResult<T> streamResult = objectMapper.readValue(line, streamResultTypeRef);
130+
131+
if (streamResult.getError() != null) {
132+
// Handle error in stream
133+
if (errorConsumer != null) {
134+
Status error = streamResult.getError();
135+
String errorMessage = error.getMessage() != null
136+
? "Stream error: " + error.getMessage()
137+
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
138+
errorConsumer.accept(new ApiException(errorMessage));
139+
}
140+
} else if (streamResult.getResult() != null) {
141+
// Deliver the response object to the consumer
142+
T result = streamResult.getResult();
143+
if (result != null) {
144+
consumer.accept(result);
145+
}
146+
}
147+
} catch (Exception e) {
148+
if (errorConsumer != null) {
149+
errorConsumer.accept(e);
150+
}
151+
}
152+
}
153+
154+
/**
155+
* Build an HTTP request for the streaming endpoint
156+
*
157+
* @param method HTTP method (e.g., "POST")
158+
* @param path The API path
159+
* @param body The request body
160+
* @param configuration The configuration to use
161+
* @return HttpRequest ready to execute
162+
* @throws ApiException if request building fails
163+
* @throws FgaInvalidParameterException if parameters are invalid
164+
*/
165+
protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
166+
throws ApiException, FgaInvalidParameterException {
167+
try {
168+
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
169+
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);
170+
171+
// Apply request interceptors if any
172+
var interceptor = apiClient.getRequestInterceptor();
173+
if (interceptor != null) {
174+
interceptor.accept(requestBuilder);
175+
}
176+
177+
return requestBuilder.build();
178+
} catch (Exception e) {
179+
throw new ApiException(e);
180+
}
181+
}
182+
}

0 commit comments

Comments
 (0)