Location>code7788 >text

Record simple udp and sni proxy done

Popularity:513 ℃/2025-03-15 14:51:53

Since I have learned from Kestrel's many abstract and optimization implementations before, it is very convenient for subsequent expansion.

Implementing simple udp and sni proxy two functions are much faster than expected (of course there are also lazy factors)

(PS If you have time, can you go to GitHub/fs7744/NZOrzClick a star? After all, it is not easy to borrow code, hahahahahaha)

Simple udp proxy

The udp proxy function here is relatively simple: when the agent receives any udp package, it will find the upstream through the route matching and forward it to the upstream.

Udp proxy usage configuration

The basic format is consistent with the previous tcp proxy.

onlyProtocolsHave to chooseUDP, and then there are moreUdpResponsesHow many udp packets are allowed to return to the requester, the default is 0, that is, no packets are returned

{
  "Logging": {
    "LogLevel": {
      "Default": "Information"
    }
  },
  "ReverseProxy": {
    "Routes": {
      "udpTest": {
        "Protocols": [ "UDP" ],
        "Match": {
          "Hosts": [ "*:5000" ]
        },
        "ClusterId": "udpTest",
        "RetryCount": 1,
        "UdpResponses": 1,
        "Timeout": "00:00:11"
      }
    },
    "Clusters": {
      "udpTest": {
        "LoadBalancingPolicy": "RoundRobin",
        "HealthCheck": {
          "Passive": {
            "Enable": true
          }
        },
        "Destinations": [
          {
            "Address": "127.0.0.1:11000"
          }
        ]
      }
    }
  }
}

accomplish

Here are some lists to show how simple it is

ps: Since the implementation of a very simple udp proxy, it is not based onIMultiplexedConnectionListener, and based onIConnectionListenerMethod (Yes, I'm lazy)

1. ImplementationUdpConnectionContext

If you are lazy, just put the udp package data on the context and not on Parameters to reduce dictionary instances and memory usage

public sealed class UdpConnectionContext : TransportConnection
{
    private readonly IMemoryOwner<byte> memory;
    public Socket Socket { get; }
    public int ReceivedBytesCount { get; }

    public Memory<byte> ReceivedBytes => (0, ReceivedBytesCount);

    public UdpConnectionContext(Socket socket, UdpReceiveFromResult result)
    {
        Socket = socket;
        ReceivedBytesCount = ;
         = ;
        LocalEndPoint = ;
        RemoteEndPoint = ;
    }

    public UdpConnectionContext(Socket socket, EndPoint remoteEndPoint, int receivedBytes, IMemoryOwner<byte> memory)
    {
        Socket = socket;
        ReceivedBytesCount = receivedBytes;
         = memory;
        LocalEndPoint = ;
        RemoteEndPoint = remoteEndPoint;
    }

    public override ValueTask DisposeAsync()
    {
        ();
        return default;
    }
}
2. ImplementationIConnectionListener
internal sealed class UdpConnectionListener : IConnectionListener
{
    private EndPoint? udpEndPoint;
    private readonly GatewayProtocols protocols;
    private OrzLogger _logger;
    private readonly IUdpConnectionFactory connectionFactory;
    private readonly Func<EndPoint, GatewayProtocols, Socket> createBoundListenSocket;
    private Socket? _listenSocket;

    public UdpConnectionListener(EndPoint? udpEndPoint, GatewayProtocols protocols, IRouteContractor contractor, OrzLogger logger, IUdpConnectionFactory connectionFactory)
    {
         = udpEndPoint;
         = protocols;
        _logger = logger;
         = connectionFactory;
        createBoundListenSocket = ().CreateBoundListenSocket;
    }

    public EndPoint EndPoint => udpEndPoint;

    internal void Bind()
    {
        if (_listenSocket != null)
        {
            throw new InvalidOperationException("Transport is already bound.");
        }

        Socket listenSocket;
        try
        {
            listenSocket = createBoundListenSocket(EndPoint, protocols);
        }
        catch (SocketException e) when ( == )
        {
            throw new AddressInUseException(, e);
        }

        ( != null);

        _listenSocket = listenSocket;
    }

    public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
    {
        while (true)
        {
            try
            {
                (_listenSocket != null, "Bind must be called first.");
                var r = await (_listenSocket, cancellationToken);
                return new UdpConnectionContext(_listenSocket, r);
            }
            catch (ObjectDisposedException)
            {
                // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
                return null;
            }
            catch (SocketException e) when ( == )
            {
                // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
                return null;
            }
            catch (SocketException)
            {
                // The connection got reset while it was in the backlog, so we try again.
                _logger.ConnectionReset("(null)");
            }
        }
    }

    public ValueTask DisposeAsync()
    {
        _listenSocket?.Dispose();

        return default;
    }

    public ValueTask UnbindAsync(CancellationToken cancellationToken = default)
    {
        _listenSocket?.Dispose();
        return default;
    }
}
3. ImplementationIConnectionListenerFactory
public sealed class UdpTransportFactory : IConnectionListenerFactory, IConnectionListenerFactorySelector
{
    private readonly IRouteContractor contractor;
    private readonly OrzLogger logger;
    private readonly IUdpConnectionFactory connectionFactory;

    public UdpTransportFactory(
        IRouteContractor contractor,
        OrzLogger logger,
        IUdpConnectionFactory connectionFactory)
    {
        (contractor);
        (logger);

         = contractor;
         = logger;
         = connectionFactory;
    }

    public ValueTask<IConnectionListener> BindAsync(EndPoint endpoint, GatewayProtocols protocols, CancellationToken cancellationToken = default)
    {
        var transport = new UdpConnectionListener(endpoint, , contractor, logger, connectionFactory);
        ();
        return new ValueTask<IConnectionListener>(transport);
    }

    public bool CanBind(EndPoint endpoint, GatewayProtocols protocols)
    {
        if (!()) return false;
        return endpoint switch
        {
            IPEndPoint _ => true,
            _ => false
        };
    }
}
4. InL4ProxyMiddlewareImplement udp proxy specific logic

The route and the previous tcp publication will not be listed here

public class L4ProxyMiddleware : IOrderMiddleware
{    
    public async Task Invoke(ConnectionContext context, ConnectionDelegate next)
    {
        try
        {
            if ( == )
            {
                await SNIProxyAsync(context);
            }
            else
            {
                var route = await (context);
                if (route is null)
                {
                    logger.NotFoundRouteL4();
                }
                else
                {
                     = route;
                    ();
                    if ( == )
                    {
                        await TcpProxyAsync(context, route);
                    }
                    else
                    {
                        await UdpProxyAsync((UdpConnectionContext)context, route);
                    }
                    ();
                }
            }
        }
        catch (Exception ex)
        {
            (, ex);
        }
        finally
        {
            await next(context);
        }
    }

    private async Task UdpProxyAsync(UdpConnectionContext context, RouteConfig route)
    {
        try
        {
            var socket = new Socket(, , );
            var cts = (cancellationTokenSourcePool);
            var token = ;
            if (await DoUdpSendToAsync(socket, context, route, , await reqUdp(context, , token), token))
            {
                var c = ;
                while (c > 0)
                {
                    var r = await (socket, token);
                    c--;
                    await (, , await respUdp(context, (), token), token);
                }
            }
            else
            {
                ();
            }
        }
        catch (OperationCanceledException)
        {
            ();
        }
        catch (Exception ex)
        {
            (nameof(UdpProxyAsync), ex);
        }
        finally
        {
            ?.();
        }
    }

So is it really simple? In theory, it is the same based on Kestrel.

optimization

Of course, referring to Kestrel's tcp socket processing, it is also somewhat simple to optimize, such as

  • Not usedUdpClient(PS is not because the implementation is bad, but because it is more common and there is no chance to change the content in it)
  • based onSocketAsyncEventArgs, IValueTaskSource<SocketReceiveFromResult>andSocketAsyncEventArgs, IValueTaskSource<int>Implementation: Submit asynchronous read and write toPipeSchedulerLogic
  • based onConcurrentQueue<UdpSender>Implement simple UDP sending object pool, strengthen object reuse, and slightly reduce memory usage
  • based onConcurrentQueue<PooledCancellationTokenSource>Implement simpleCancellationTokenSourceObject pool, strengthen object reuse, slightly reduce memory usage

Sni Agent

In addition to the basic proxy of tcp and udp, we also tried to implement an sni proxy for tcp (such as https of http1 and http2)

However, currently, only the proxy does not perform ssl encryption and decryption, and the upstream handles itself. If the proxy wants to implement ssl encryption and decryption, it is theoretically based on off-the-shelfsslstream

sni proxy usage configuration

Just configureListenCommon sni listening port

Then configure your own route and upstream in different sni

At the same time, each route can passSupportSslProtocolsLimit tls version

Give a chestnut

{
  "Logging": {
    "LogLevel": {
      "Default": "Information"
    }
  },
  "ReverseProxy": {
    "Listen": {
      "snitest": {
        "Protocols": "SNI",
        "Address": [ "*:444" ]
      }
    },
    "Routes": {
      "snitestroute": {
        "Protocols": "SNI",
        "SupportSslProtocols": [ "Tls13", "Tls12" ],
        "Match": {
          "Hosts": [ "*" ]
        },
        "ClusterId": "apidemo"
      },
      "snitestroute2": {
        "Protocols": "Tcp",
        "Match": {
          "Hosts": [ "*:448" ]
        },
        "ClusterId": "apidemo"
      }
    },
    "Clusters": {
      "apidemo": {
        "LoadBalancingPolicy": "RoundRobin",
        "HealthCheck": {
          "Active": {
            "Enable": true,
            "Policy": "Connect"
          }
        },
        "Destinations": [
          {
            "Address": ""
          }
        ]
      }
    }
  }
}

accomplish

The core implementation is actually only routing processing, and the proxy proxy and tcp proxy are exactly the same (just transport tcp data between requests and upstream)

Routing processing

passClientHelloFind the domain name to access, then find the upstream through the domain name matching route, and finally transfer the tcp data

ClientHelloJust transfer it directly fromTlsFrameHelper

///Route matching
     public async ValueTask<(RouteConfig, ReadResult)> MatchSNIAsync(ConnectionContext context, CancellationToken token)
     {
         if (sniRoute is null) return (null, default);
         var (hello, rr) = await TryGetClientHelloAsync(context, token);
         if ()
         {
             var h = ;
             var r = await ((), h, MatchSNI);
             if (r is null)
             {
                 ();
             }
             return (r, rr);
         }
         else
         {
             ("client hello failed");
             return (null, rr);
         }
     }

     /// Match the tls version
     private bool MatchSNI(RouteConfig config, TlsFrameInfo info)
     {
         if (!) return true;
         var v = ;
         if (v == ) return true;
         var t = ;
         if ((SslProtocols.Tls13) && (SslProtocols.Tls13)) return true;
         else if ((SslProtocols.Tls12) && (SslProtocols.Tls12)) return true;
         else if ((SslProtocols.Tls11) && (SslProtocols.Tls11)) return true;
         else if (() && ()) return true;
         else if ((SslProtocols.Ssl3) && (SslProtocols.Ssl3)) return true;
         else if ((SslProtocols.Ssl2) && (SslProtocols.Ssl2)) return true;
         else if (() && ()) return true;
         else return false;
     }

     /// parse ClientHello
     private static async ValueTask<(TlsFrameInfo?, ReadResult)> TryGetClientHelloAsync(ConnectionContext context, CancellationToken token)
     {
         var input = ;
         TlsFrameInfo info = default;
         While (true)
         {
             var f = await (token).ConfigureAwait(false);
             if ()
             {
                 return (null, f);
             }
             var buffer = ;
             if ( == 0)
             {
                 continue;
             }

             var data = ? : ();
             if ((data, ref info))
             {
                 return (info, f);
             }
             else
             {
                 (, );
                 continue;
             }
         }
     }
Move tcp data
private async Task SNIProxyAsync(ConnectionContext context)
 {
     var c = ();
     ();
     var (route, r) = await (context, );
     if (route is not null)
     {
          = route;
         ();
         ConnectionContext upstream = null;
         try
         {
             upstream = await DoConnectionAsync(context, route, );
             if (upstream is null)
             {
                 ();
             }
             else
             {
                 ?.();
                 var cts = (cancellationTokenSourcePool);
                 var t = ;
                 await (, t); // The only difference is that the tcp agent transports data. You must first send ClientHello data because it has been read by us.
                 ();
                 var task = hasMiddlewareTcp?
                         await (
                         (new MiddlewarePipeWriter(, context, reqTcp), t)
                         , (new MiddlewarePipeWriter(, context, respTcp), t))
                         : await (
                         (, t)
                         , (, t));
                 if ()
                 {
                     (, );
                 }
             }
         }
         catch (OperationCanceledException)
         {
             ();
         }
         catch (Exception ex)
         {
             (nameof(TcpProxyAsync), ex);
         }
         Finally
         {
             ?.();
             upstream?.Abort();
         }
         ();
     }
 }

All parts of the component are replaceable or added

Because the whole is based on IOC, all parts of the component can be replaced or added, and the customized extension is still very high.

The currently exposed list can be viewed in the code

internal static HostApplicationBuilder UseOrzDefaults(this HostApplicationBuilder builder)
{
    var services = ;
    <IHostedService, HostedService>();
    ();
    <IMeterFactory, DummyMeterFactory>();
    <IServer, OrzServer>();
    <OrzLogger>();
    <OrzMetrics>();
    <IConnectionListenerFactory, SocketTransportFactory>();
    <IConnectionListenerFactory, UdpTransportFactory>();
    <IUdpConnectionFactory, UdpConnectionFactory>();
    <IConnectionFactory, SocketConnectionFactory>();
    <IRouteContractorValidator, RouteContractorValidator>();
    <IEndPointConvertor, CommonEndPointConvertor>();
    <IL4Router, L4Router>();
    <IOrderMiddleware, L4ProxyMiddleware>();
    <ILoadBalancingPolicyFactory, LoadBalancingPolicy>();
    <IClusterConfigValidator, ClusterConfigValidator>();
    <IDestinationResolver, DnsDestinationResolver>();

    <ILoadBalancingPolicy, RandomLoadBalancingPolicy>();
    <ILoadBalancingPolicy, RoundRobinLoadBalancingPolicy>();
    <ILoadBalancingPolicy, LeastRequestsLoadBalancingPolicy>();
    <ILoadBalancingPolicy, PowerOfTwoChoicesLoadBalancingPolicy>();

    <IHealthReporter, PassiveHealthReporter>();
    <IHealthUpdater, HealthyAndUnknownDestinationsUpdater>();
    <IActiveHealthCheckMonitor, ActiveHealthCheckMonitor>();
    <IActiveHealthChecker, ConnectionActiveHealthChecker>();

    return builder;
}

For example, if you want to add a load balancing strategy, you can achieve it

public interface ILoadBalancingPolicy
{
    string Name { get; }

    DestinationState? PickDestination(ConnectionContext context, IReadOnlyList<DestinationState> availableDestinations);
}

If you are not satisfied with all existing load balancing strategies, you can replace them directlyILoadBalancingPolicyFactory

public interface ILoadBalancingPolicyFactory
{
    DestinationState? PickDestination(ConnectionContext context, RouteConfig route);
}

For example, you can forward requests that are inaccessible to the development environment (or other environment) on a machine with other access permissions through SNI.

Almost done, making wheels is quite fun, of course if you are on GitHub/fs7744/NZOrzClick a star and it will be even more fun