Recently, the popularity of domestic big models such as DeepSeek has continued to rise, and their attention has even exceeded that of OpenAI (knownly known as CloseAI). existIn the environment, you can easily access using the official Spring AI, but for those who are still using itJDK8andSpringBoot2.7.3For enterprise-level applications, custom implementations are often required. Especially when the data format returned by the large model team does not comply with the standard SSE specifications, it needs to be processed flexibly. This article will share our practical solutions.
📦 Introducing Gradle dependencies
Core dependency description:
-
spring-boot-starter-web
: Basic Web Support -
spring-boot-starter-webflux
: Responsive programming support (the module where WebClient is located)
implementation ':spring-boot-starter-web'
implementation ':spring-boot-starter-webflux'
🌐 WebClient configuration points
Pay special attention to the Header configuration when initializing:
@Bean
public WebClient init() {
Return ()
.baseUrl(baseUrl)
.defaultHeader(, "Bearer " + openAi)
// ⚠️ Must be set to JSON format
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
🚨 Key pitfall point: initial setting
MediaType.TEXT_EVENT_STREAM_VALUE
Will cause the request to fail and must be usedAPPLICATION_JSON_VALUE
🧠 Core processing logic
Streaming request portal
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
// Request body construction
String requestBody = ("""
{
"model": "%s",
"messages": [{"role": "user", "content": "%s"}],
"stream": true
}
""", model, prompt);
Return ()
// Request configuration
.uri("/v1/chat/completes")
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux() // 🔑 Key configuration points
.transform(this::processStream)
// Retry and timeout configuration
.retryWhen((3, (1)))
.timeout((180));
// Error handling
.doOnError(e -> ("Stream error", e))
.doFinally(signal -> ("Stream completed: {}", signal));
}
Technical principles description
When usingbodyToFlux()
hour:
- ✅ Obtain original byte flow control rights
- ❌ Avoid automatic SSE format parsing (for non-standard responses)
- 📡 Dynamic data stream processing: similar to Java Stream, but data is continuously added
🔧 Non-standard SSE data processing
Core processing flow
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
return dataBufferFlux
.transform(DataBufferUtils::join) // Byte stream merging
.map(buffer -> { // Byte to string
String content = (StandardCharsets.UTF_8);
(buffer);
return content;
})
.flatMap(content -> // Handle sticky packet problem
(("\\r?\\n\\r?\\n")))
.filter(event -> !().isEmpty()) // Filter empty events
.map(event -> { // Format standardization processing
String trimmed = ();
if (("data:")) {
String substring = (5);
return (" ") ? (1) : substring;
}
return trimmed;
})
.filter(event -> !("data:")); // Secondary filtering
}
Three key technical points
-
Sticking bag treatment
passsplit("\\r?\\n\\r?\\n")
Solve the message boundary problem in network transmission, example raw data:data:{response1}\n\ndata:{response2}\n\n
-
Format compatibility processing
Automatically remove possible return from the serverdata:
Prefix while preserving Spring's ability to automatically add SSE prefixes -
Dual filtering mechanism
Make sure the final output does not contain any residual SSE format identifiers
⚠️ Pay special attention
When the interface is setproduces = MediaType.TEXT_EVENT_STREAM_VALUE
hour:
-
Spring WebFlux will be automatically added
data:
Prefix -
Example format received by the front-end:
data: {actual content}
-
If added manually
data:
Prefixes will cause duplication:
data: data: {error content} // ❌ erroneous format
🛠️ Complete implementation code
// Package declaration and import...
@Service
@Slf4j
public class OpenAiService {
// Configuration items and initialization
private String openAiApiKey = "sk-xxxxxx";
private String baseUrl = "/xxxx";
private String model = "gpt-4o";
private WebClient webClient;
@PostConstruct
public void init() {
webClient = ()
.baseUrl(baseUrl)
.defaultHeader(, "Bearer " + openAiApiKey)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
// Build the request body
String requestBody = ("""
{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "%s"}],
"stream": true
}
""", prompt);
// Send streaming request
Return ()
.uri("/v1/chat/completes")
.bodyValue(requestBody)
.retrieve()
.onStatus(HttpStatusCode::isError, response ->
()
.flatMap(error -> (new RuntimeException("API Error: " + error)))
)
.bodyToFlux()
.transform(this::processStream)
.retryWhen((3, (1)))
.timeout((180))
.doOnError(e -> ("Stream error", e))
.doFinally(signal -> ("Stream completed: {}", signal));
}
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
return dataBufferFlux
// Use byte stream processing
.transform(DataBufferUtils::join)
.map(buffer -> {
String content = (StandardCharsets.UTF_8);
(buffer);
return content;
})
// Press the SSE event boundary to prevent the problem of sticking packets
.flatMap(content -> (("\\r?\\n\\r?\\n")))
// Filter empty events
.filter(event -> !().isEmpty())
// Standardize SSE event format
.map(event -> {
String trimmed = ();
// Since webflux sets "produces = MediaType.TEXT_EVENT_STREAM_VALUE",
// So when returning data, "data:" will be automatically added, so if the returned format contains "data:" and needs to be removed manually
if (("data:")) {
trimmed = ("data:","").trim();
}
return trimmed;
})
.filter(event -> !("data:"));
}
}