Location>code7788 >text

SpringBoot access to non-standard SSE format streaming response practice 🚀

Popularity:41 ℃/2025-02-25 00:54:01

Recently, the popularity of domestic big models such as DeepSeek has continued to rise, and their attention has even exceeded that of OpenAI (knownly known as CloseAI). existIn the environment, you can easily access using the official Spring AI, but for those who are still using itJDK8andSpringBoot2.7.3For enterprise-level applications, custom implementations are often required. Especially when the data format returned by the large model team does not comply with the standard SSE specifications, it needs to be processed flexibly. This article will share our practical solutions.


📦 Introducing Gradle dependencies

Core dependency description:

  • spring-boot-starter-web: Basic Web Support
  • spring-boot-starter-webflux: Responsive programming support (the module where WebClient is located)
implementation ':spring-boot-starter-web'
implementation ':spring-boot-starter-webflux'

🌐 WebClient configuration points

Pay special attention to the Header configuration when initializing:

@Bean
 public WebClient init() {
     Return ()
             .baseUrl(baseUrl)
             .defaultHeader(, "Bearer " + openAi)
             // ⚠️ Must be set to JSON format
             .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
             .build();
 }

🚨 Key pitfall point: initial settingMediaType.TEXT_EVENT_STREAM_VALUEWill cause the request to fail and must be usedAPPLICATION_JSON_VALUE


🧠 Core processing logic

Streaming request portal

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
 public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
     // Request body construction
     String requestBody = ("""
         {
             "model": "%s",
             "messages": [{"role": "user", "content": "%s"}],
             "stream": true
         }
         """, model, prompt);
                                       
     Return ()
             // Request configuration
             .uri("/v1/chat/completes")
             .bodyValue(requestBody)
             .accept(MediaType.TEXT_EVENT_STREAM)
             .retrieve()
             .bodyToFlux() // 🔑 Key configuration points
             .transform(this::processStream)
             // Retry and timeout configuration
             .retryWhen((3, (1)))
             .timeout((180));
             // Error handling
             .doOnError(e -> ("Stream error", e))
             .doFinally(signal -> ("Stream completed: {}", signal));
 }

Technical principles description

When usingbodyToFlux()hour:

  • ✅ Obtain original byte flow control rights
  • ❌ Avoid automatic SSE format parsing (for non-standard responses)
  • 📡 Dynamic data stream processing: similar to Java Stream, but data is continuously added

🔧 Non-standard SSE data processing

Core processing flow

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
     return dataBufferFlux
             .transform(DataBufferUtils::join) // Byte stream merging
             .map(buffer -> { // Byte to string
                 String content = (StandardCharsets.UTF_8);
                 (buffer);
                 return content;
             })
             .flatMap(content -> // Handle sticky packet problem
                 (("\\r?\\n\\r?\\n")))
             .filter(event -> !().isEmpty()) // Filter empty events
             .map(event -> { // Format standardization processing
                 String trimmed = ();
                 if (("data:")) {
                     String substring = (5);
                     return (" ") ? (1) : substring;
                 }
                 return trimmed;
             })
             .filter(event -> !("data:")); // Secondary filtering
 }

Three key technical points

  1. Sticking bag treatment
    passsplit("\\r?\\n\\r?\\n")Solve the message boundary problem in network transmission, example raw data:

    data:{response1}\n\ndata:{response2}\n\n
    
  2. Format compatibility processing
    Automatically remove possible return from the serverdata:Prefix while preserving Spring's ability to automatically add SSE prefixes

  3. Dual filtering mechanism
    Make sure the final output does not contain any residual SSE format identifiers


⚠️ Pay special attention

When the interface is setproduces = MediaType.TEXT_EVENT_STREAM_VALUEhour:

  • Spring WebFlux will be automatically addeddata: Prefix

  • Example format received by the front-end:

    data: {actual content}
  • If added manually

    data: 
    

    Prefixes will cause duplication:

    data: data: {error content} // ❌ erroneous format

🛠️ Complete implementation code

// Package declaration and import...

 @Service
 @Slf4j
 public class OpenAiService {
     // Configuration items and initialization
     private String openAiApiKey = "sk-xxxxxx";
    
     private String baseUrl = "/xxxx";

     private String model = "gpt-4o";

     private WebClient webClient;

     @PostConstruct
     public void init() {
         webClient = ()
                 .baseUrl(baseUrl)
                 .defaultHeader(, "Bearer " + openAiApiKey)
                 .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                 .build();
     }

     @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
         // Build the request body
         String requestBody = ("""
                 {
                     "model": "gpt-4o-mini",
                     "messages": [{"role": "user", "content": "%s"}],
                     "stream": true
                 }
                 """, prompt);
                                           
         // Send streaming request
         Return ()
             .uri("/v1/chat/completes")
             .bodyValue(requestBody)
             .retrieve()
             .onStatus(HttpStatusCode::isError, response ->
                     ()
                             .flatMap(error -> (new RuntimeException("API Error: " + error)))
             )
             .bodyToFlux()
             .transform(this::processStream)
             .retryWhen((3, (1)))
             .timeout((180))
             .doOnError(e -> ("Stream error", e))
             .doFinally(signal -> ("Stream completed: {}", signal));
     }

     private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
         return dataBufferFlux
                 // Use byte stream processing
                 .transform(DataBufferUtils::join)
                 .map(buffer -> {
                     String content = (StandardCharsets.UTF_8);
                     (buffer);
                     return content;
                 })
                 // Press the SSE event boundary to prevent the problem of sticking packets
                 .flatMap(content -> (("\\r?\\n\\r?\\n")))
                 // Filter empty events
                 .filter(event -> !().isEmpty())
                 // Standardize SSE event format
                 .map(event -> {
                     String trimmed = ();

                     // Since webflux sets "produces = MediaType.TEXT_EVENT_STREAM_VALUE",
                     // So when returning data, "data:" will be automatically added, so if the returned format contains "data:" and needs to be removed manually
                     if (("data:")) {
                         trimmed = ("data:","").trim();
                     }
                     return trimmed;
                 })
                 .filter(event -> !("data:"));
     }
 }