Location>code7788 >text

A deep dive into Spring AI: source code analysis of streaming answers

Popularity:749 ℃/2024-10-11 09:39:09

In the previous section, we analyzed Spring AI's blocking request and response mechanism in depth and explored how to enhance its memory capabilities. Today, we will focus on the concept and implementation of streaming responses. After all, AI's streaming response feature is closely related to its interactive experience and is an important part of enhancing user satisfaction.

basic usage

The basic usage is very simple, just add astream method to achieve the desired functionality. Next, we will demonstrate this process with code samples to help you understand more clearly how to do it in a real application. Please take a look at the following code:

@GetMapping(value = "/ai-stream",produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8")
Flux<String> generationByStream(@RequestParam("userInput") String userInput) {
    Flux<String> output = ()
            .user(userInput)
            .stream()
            .content();
    return output;
}

After we addstream method, the returned object type will no longer be the original blockingCallResponseSpecand instead converts it to a non-blockingStreamResponseSpec. At the same time, the returned data type is changed from the previousString change toFlux

Before delving into its specific applications, let me first introduce theFlux The concept and characterization of the

Processor Implementation of Spring WebFlux

First of all, in WebFlux, processors have been implemented non-blocking. This means that as long as our code returns a Flux object, the response functionality can be easily implemented. In this way, applications can efficiently handle concurrent requests without compromising overall performance by blocking operations.

    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if ( == null) {
            return createNotFoundError();
        }
        if ((())) {
            return handlePreFlight(exchange);
        }
        return ()
                .concatMap(mapping -> (exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .onErrorResume(ex -> handleResultMono(exchange, (ex)))
                .flatMap(handler -> handleRequestWith(exchange, handler));
    }

Here's a brief introduction to Spring WebFlux, which isn't our focus, but it's helpful to understand the basic concepts.Spring WebFlux is part of the Spring framework and is designed for building reactive applications. It supports both asynchronous and non-blocking programming models, making it more efficient to handle highly concurrent requests. Here are a few key features of WebFlux:

  1. Reactive Programming: WebFlux is based on a reactive programming model using theMono cap (a poem)Flux type to process the data stream.Mono denotes zero or an element, whileFlux then it represents zero or more elements. This model allows us to easily handle asynchronous data streams, which improves code readability and maintainability.
  2. Non-blocking I/O: WebFlux enables efficient resource utilization through non-blocking I/O operations such as Netty or Servlet 3.1+ containers. Unlike traditional blocking I/O, WebFlux has the ability to release threads while waiting for a response, which significantly improves the concurrency of an application and supports more simultaneous requests without increasing threading overhead.

Understanding these features will lay the foundation for subsequent non-blocking responsive design and help us better utilize WebFlux's capabilities to improve application performance.

source code analysis

Now let's take a closer look at how our content operates. The next code example will show a concrete implementation that will help us understand how data streams and responses are handled in WebFlux.

public Flux<String> content() {
    return doGetFluxChatResponse().map(r -> {
        if (() == null || ().getOutput() == null
                || ().getOutput().getContent() == null) {
            return "";
        }
        return ().getOutput().getContent();
    }).filter(StringUtils::hasLength);
}

The implementation here is relatively simple, mainly passing in a function. Next, we will analyze the code implementation of doGetFluxChatResponse in depth to better understand the logic and how it works:

private Flux<ChatResponse> doGetFluxChatResponse2(DefaultChatClientRequestSpec inputRequest) {
//Omit duplicate code here
    var fluxChatResponse = (prompt);
//Omit duplicate code here
    return advisedResponse;
}

The logic of the code here is essentially the same as the blocking answer, with the only difference being that it calls the(prompt) Methods. Next, we'll dive into the(prompt) The specific implementation of the method and the design thinking behind it:

public Flux<ChatResponse> stream(Prompt prompt) {
        return (contextView -> {
        //Omit duplicate code here
            Flux<> completionChunks = (request,
                    getAdditionalHttpHeaders(prompt));
//Omit duplicate code here
            Flux<ChatResponse> chatResponse = (this::chunkToChatCompletion)
                .switchMap(chatCompletion -> (chatCompletion).map(chatCompletion2 -> {
//Omit duplicate code here
                        return new ChatResponse(generations, from(chatCompletion2, null));
                    }
                }));
//Omit duplicate code here
            return new MessageAggregator().aggregate(flux, observationContext::setResponse);

        });
    }

The same logic will not be repeated here and we will focus on the differences. In this section, we use thechatCompletionStream, and unlike before, it no longer uses theretryTemplateInstead, it introduceswebClientThis is a tool class that can receive event streams.

public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest,
        MultiValueMap<String, String> additionalHttpHeader) {

    (chatRequest, "The request body can not be null.");
    ((), "Request must set the stream property to true.");

    AtomicBoolean isInsideTool = new AtomicBoolean(false);

    return ()
        .uri()
        .headers(headers -> (additionalHttpHeader))
        .body((chatRequest), )
        .retrieve()
        .bodyToFlux()
        // cancels the flux stream after the "[DONE]" is received.
        .takeUntil(SSE_DONE_PREDICATE)
        // filters out the "[DONE]" message.
        .filter(SSE_DONE_PREDICATE.negate())
        .map(content -> (content, ))
//Omit a bunch of code here

The main purpose of this code is to pass thewebClient A POST request is made to the specified path with appropriate request headers and body. When fetching the response data, an event stream is used (via thebodyToFlux method) to receive the response content and filter and transform the data, ultimately converting it into theChatCompletionChunk Object.

While the rest of the business logic is similar to the previous one, one significant difference is that the entire process is non-blocking in terms of the type of return as well as the way in which it calls the OpenAI API.

summarize

In today's digital age, streaming response mechanisms not only improve system performance, but also play a key role in user experience. By introducing the Flux type, Spring WebFlux's design philosophy enables applications to handle concurrent requests in a non-blocking manner, thus efficiently utilizing resources and reducing response latency.

We finally have a comprehensive explanation of the basic operation of Spring AI, including blocking answers, streaming answers, and memory enhancement features. These contents have laid the foundation for us to deeply understand its working mechanism. Next, we will continue to explore the source code in depth, focusing on analyzing callback functions, entity class mapping, and other important features.

This will help us better understand the inner workings of Spring AI and provide guidance for further optimization and customization.


I'm Rain, a Java server-side coder, studying the mysteries of AI technology. I love technical communication and sharing, and I am passionate about open source community. I am also a Tencent Cloud Creative Star, Ali Cloud Expert Blogger, Huawei Cloud Enjoyment Expert, and Nuggets Excellent Author.

💡 I won't be shy about sharing my personal explorations and experiences on the path of technology, in the hope that I can bring some inspiration and help to your learning and growth.

🌟 Welcome to the effortless drizzle! 🌟