Location>code7788 >text

Websocket communication + big model conversations using Django-Channels

Popularity:117 ℃/2024-08-14 17:01:57

preamble

I've been working on this big model project lately, and I've chosen Django as the framework (a lot of big model apps use FastAPI these days, but I'm already used to Django)

Before using AspNetCore as the back-end, I have tried Blazor Server, WebAPI SSE (Server Sent Event) and other programs to achieve the large model of the conversation, it seems that SSE is used more, ChatGPT is also used this.

Since Django has been supporting asynchronous programming since the beginning of the era, it is also possible to implement SSE (though I found it a bit of a toss-up in my testing), and I finally decided to use WebSocket to implement it, which is the set of channels in the Django ecosystem. (That said, FastAPI seems to support both SSE and WebSocket directly.)

Let's see the effect first

Put up a chat screen

聊天界面

I also made a dialog history page

对话历史页面

About django-channels

To paraphrase from the official website./en/latest/

Channels wraps Django’s native asynchronous view support, allowing Django projects to handle not only HTTP, but protocols that require long-running connections too - WebSockets, MQTT, chatbots, amateur radio, and more.

OK, Channels extends Django from a purely synchronous HTTP framework to one that can handle asynchronous protocols such as WebSocket. where Django was originally based on WSGI, channels uses the ASGI-based daphne server, and not only works with WebSocket, but also supports newer technologies such as HTTP/2.

Several related concepts:

  • Channels: Persistent connections, such as WebSocket, can be used for real-time data transfer.
  • Consumers: Asynchronous functionality for handling input events, similar to Django's views but designed for asynchronous operations.
  • Routing: Similar to Django's URL routing system, Channels uses routing to determine how to distribute a given WebSocket connection or message to the appropriate Consumer.

Django Channels allows developers to use an asynchronous programming model compared to traditional Django request processing, which is especially beneficial for handling long-running connections or applications that require a large number of concurrent connections. This architectural change results in higher performance and a better user experience, making Django better suited to the needs of modern Internet applications.

With the introduction of Channels, Django is no longer just a request/response web framework, but a truly full-featured framework capable of handling multiple network protocols and long connections. This allows Django developers to develop richer and more dynamic applications without leaving their familiar environment.

Usage Scenarios

Let's start with the usage scenarios

The backend of this demo project uses StarAI and LangChain to call LLM to get answers, and then communicates with the frontend via WebSocket, which I chose to use React + Tailwind.

mounting

Take the DjangoStarter project as an example (using pdm as the package manager)

pdm add channels[daphne]

Then modifysrc/config/settings/components/

Add daphne to the Registered Apps, note that it is at the top of the list

# Application Definitions
INSTALLED_APPS: Tuple[str, ...] = (
    'daphne', .
)

afterwardsrunserver Instead of Django's built-in server, daphne will run the

Next, channels need a channel layer, which allows multiple consumer instances to communicate with each other and with the rest of Django, and which can be chosen from the Redis

pdm add channels_redis

configure

ASGI

OK, we'll have to make some changes.src/config/

import os

from  import AuthMiddlewareStack
from  import ProtocolTypeRouter, URLRouter
from  import AllowedHostsOriginValidator
from  import get_asgi_application

('DJANGO_SETTINGS_MODULE', '')

django_asgi_app = get_asgi_application()

from  import websocket_urlpatterns

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    # Just HTTP for now. (We can add other protocols later.)
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
    ),
})

In addition to the configurations stated in the official website documentation, I've added the routing of the chat app here

Since this is the only app in the demo project that uses WebSocket, we'll use the routing in chat as the root URLRouter.

If you have multiple WebSocket apps, you can modify them as needed.

channel layer

modificationssrc/config/settings/components/ file

from  import DOCKER

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.",
        "CONFIG": {
            "hosts": [("redis" if DOCKER else "127.0.0.1", 6379)],
        },
    },
}

I won't go into the rest of the configuration, it's all configured in DjangoStarter.

Writing back-end code

The use of channels is very simple, as mentioned earlier, we just need to complete the consumer logic code, and then configure the routing.

So let's get started.

consumer

establishsrc/apps/chat/ file

Authentication, generating replies using large models, chat logs, and more are all here!

First the code, later the introduction

import asyncio
import json

from import async_to_sync
from import database_sync_to_async
from import WebsocketConsumer, AsyncWebsocketConsumer

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

from .models import Conversation, Message

class LlmConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(args, kwargs)
        self.chat_id = None
         = None

    @database_sync_to_async
    def get_conversation(self, pk, user):
        obj, _ = .get_or_create(id=pk, user=user)
        return obj

    @database_sync_to_async
    def add_message(self, role: str, content: str):
        return (
            conversation=,
            role=role,
            content=content,
        )

    async def connect(self):
        self.chat_id = ["url_route"]["kwargs"]["chat_id"]
        await ()

        # Check if the user is logged in
        user = ["user"]
        if not user.is_authenticated:
            await (code=4001, reason='Authentication required. Please log in again.')
            return
        else:
             = await self.get_conversation(self.chat_id, user)

    async def disconnect(self, close_code):
        ...

    async def receive(self, text_data=None, bytes_data=None):
        text_data_json: dict = (text_data)
        history: list = text_data_json.get("history")

        user = ["user"]
        if not user.is_authenticated:
            reason = 'Authentication required. Please log in again.'
            await (text_data=({"message": reason}))
            await (0.1)
            await (code=4001, reason=reason)
            return

        await self.add_message(**history[-1])

        llm = ChatOpenAI(model="gpt4")
        resp = (
            [
                *[
                    HumanMessage(content=e['content']) if e['role'] == 'user' else AIMessage(content=e['content'])
                    for e in history
                ],
            ]
        )

        message_chunks = []
        for chunk in resp:
            message_chunks.append()
            await (text_data=({"message": ''.join(message_chunks)}))
            await (0.1)

        await self.add_message('ai', ''.join(message_chunks))

workflow

  • After the front-end initiates the ws connection, execute theconnect method, and if the authentication is OK, call the() method accepts the connection
  • Receives a message from the front-end and automatically executes thereceive method, which after processing calls the Information can be sent

authenticate

Although in has been configured in theAuthMiddlewareStack

But you still need to handle the authentication logic in the code itself

The purpose of this middleware is to turn the authorization information in the header into the authorization information in the consumer.["user"]

pivot

  • There are synchronous and asynchronous versions of consumer, I'm using the asynchronous version here.AsyncWebsocketConsumer
  • To access Django ORM in an async consumer, you need to use thedatabase_sync_to_async decorator
  • existconnect Inside, an authentication failure calls the Close the connection, the parameter code in it can not conflict with HTTP Status Code, I used 401 at first, but when the front-end receives it, it turns into other code, change it to the customized 4001 before it can receive it. Butreason The parameter is never received, I don't know why, maybe it has something to do with the browser's WebSocket implementation?
  • In the receive method, when sending the content generated by the big model to the client using streaming output, be sure to add aawait Wait for a while, this is to allow time for the WebSocket to send messages to the client, otherwise it will become all generated and then sent to the client at once, without the effect of streaming output.
  • In the receive method, after receiving the message, it first determines whether there is a current login (or whether the login has expired, the behavior is the same as not logging in), and if not logged in, it sends the message "Authentication required. Please log in again." to the client, and then closes the connection.

routing

After writing the consumer, configure the route

compilersrc/apps/chat/ file

from  import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r"ws/chat/demo/(?P<room_name>\w+)/$", .as_asgi()),
    re_path(r"ws/chat/llm/(?P<chat_id>\w+)/$", .as_asgi()),
]

Note here that you can only use there_path should notpath (official documentation says)

Client development

OK, the back-end part to this is done, the next write a client code

I chose React to implement the client

I'll leave the extraneous code alone and post the key code directly

function App() {
  const [prompt, setPrompt] = useState('')
  const [messages, setMessages] = useState([])
  const [connectState, setConnectState] = useState(3)
  const [conversation, setConversation] = useState()

  const chatSocket = useRef(null)
  const messagesRef = useRef(null)
  const reLoginModalRef = useRef(null)

  (() => {
    openWebSocket()
    getConversation()

    return () => {
      ();
    }
  }, [])

  (() => {
    // Automatically scrolls to the bottom of the message container
    ({behavior: 'smooth'})
  }, [messages]);

  const getConversation = () => {
    // ... Omit Get Chat Log Code
  }

  const openWebSocket = () => {
    setConnectState(0)
     = new WebSocket(`ws://${}/ws/chat/llm/${ConversationId}/`)
     = function (e) {
      ('WebSocketConnection established', e)
      setConnectState()
    };

     = function (e) {
      const data = ()
      setMessages(prevMessages => {
        if (prevMessages[ - 1].role === 'ai')
          return [
            ...(0, -1), {role: 'ai', content: }
          ]
        else
          return [
            ...prevMessages, {role: 'ai', content: }
          ]
      })
    };

     = function (e) {
      ('WebSocket broken link。Chat socket closed unexpectedly.', e)
      setConnectState()
      if ( === 4001) {
        // Displaying the Relogin dialog box
        new Modal(, options).show()
      }
    };
  }

  const sendMessage = () => {
    if ( === 0) return

    const newMessages = [...messages, {
      role: 'user', content: prompt
    }]
    setMessages(newMessages)
    setPrompt('')
    (({
      'history': newMessages
    }))
  }
}

The front-end code is nothing to write home about, simple and simple

I used it.({behavior: 'smooth'}) to make messages automatically slide to the bottom, I think I used some other implementation when I was developing AIHub with Blazor, but I don't remember this one.

But this method is quite silky

It doesn't feel very elegant to write the WebSocket address directly here, and you'll have to change it later after deployment towss:// Also troublesome, will have to look into whether there is a more elegant implementation.

wrap-up

This is just a simple demo, in fact, the production has to consider a lot of issues, this article is for the channels of the application of the beginning of the subsequent new research results will continue to update the blog ~!