Since I have learned from Kestrel's many abstract and optimization implementations before, it is very convenient for subsequent expansion.
Implementing simple udp and sni proxy two functions are much faster than expected (of course there are also lazy factors)
(PS If you have time, can you go to GitHub/fs7744/NZOrzClick a star? After all, it is not easy to borrow code, hahahahahaha)
Simple udp proxy
The udp proxy function here is relatively simple: when the agent receives any udp package, it will find the upstream through the route matching and forward it to the upstream.
Udp proxy usage configuration
The basic format is consistent with the previous tcp proxy.
onlyProtocols
Have to chooseUDP
, and then there are moreUdpResponses
How many udp packets are allowed to return to the requester, the default is 0, that is, no packets are returned
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReverseProxy": {
"Routes": {
"udpTest": {
"Protocols": [ "UDP" ],
"Match": {
"Hosts": [ "*:5000" ]
},
"ClusterId": "udpTest",
"RetryCount": 1,
"UdpResponses": 1,
"Timeout": "00:00:11"
}
},
"Clusters": {
"udpTest": {
"LoadBalancingPolicy": "RoundRobin",
"HealthCheck": {
"Passive": {
"Enable": true
}
},
"Destinations": [
{
"Address": "127.0.0.1:11000"
}
]
}
}
}
}
accomplish
Here are some lists to show how simple it is
ps: Since the implementation of a very simple udp proxy, it is not based onIMultiplexedConnectionListener
, and based onIConnectionListener
Method (Yes, I'm lazy)
1. ImplementationUdpConnectionContext
If you are lazy, just put the udp package data on the context and not on Parameters to reduce dictionary instances and memory usage
public sealed class UdpConnectionContext : TransportConnection
{
private readonly IMemoryOwner<byte> memory;
public Socket Socket { get; }
public int ReceivedBytesCount { get; }
public Memory<byte> ReceivedBytes => (0, ReceivedBytesCount);
public UdpConnectionContext(Socket socket, UdpReceiveFromResult result)
{
Socket = socket;
ReceivedBytesCount = ;
= ;
LocalEndPoint = ;
RemoteEndPoint = ;
}
public UdpConnectionContext(Socket socket, EndPoint remoteEndPoint, int receivedBytes, IMemoryOwner<byte> memory)
{
Socket = socket;
ReceivedBytesCount = receivedBytes;
= memory;
LocalEndPoint = ;
RemoteEndPoint = remoteEndPoint;
}
public override ValueTask DisposeAsync()
{
();
return default;
}
}
2. ImplementationIConnectionListener
internal sealed class UdpConnectionListener : IConnectionListener
{
private EndPoint? udpEndPoint;
private readonly GatewayProtocols protocols;
private OrzLogger _logger;
private readonly IUdpConnectionFactory connectionFactory;
private readonly Func<EndPoint, GatewayProtocols, Socket> createBoundListenSocket;
private Socket? _listenSocket;
public UdpConnectionListener(EndPoint? udpEndPoint, GatewayProtocols protocols, IRouteContractor contractor, OrzLogger logger, IUdpConnectionFactory connectionFactory)
{
= udpEndPoint;
= protocols;
_logger = logger;
= connectionFactory;
createBoundListenSocket = ().CreateBoundListenSocket;
}
public EndPoint EndPoint => udpEndPoint;
internal void Bind()
{
if (_listenSocket != null)
{
throw new InvalidOperationException("Transport is already bound.");
}
Socket listenSocket;
try
{
listenSocket = createBoundListenSocket(EndPoint, protocols);
}
catch (SocketException e) when ( == )
{
throw new AddressInUseException(, e);
}
( != null);
_listenSocket = listenSocket;
}
public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
{
while (true)
{
try
{
(_listenSocket != null, "Bind must be called first.");
var r = await (_listenSocket, cancellationToken);
return new UdpConnectionContext(_listenSocket, r);
}
catch (ObjectDisposedException)
{
// A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
return null;
}
catch (SocketException e) when ( == )
{
// A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
return null;
}
catch (SocketException)
{
// The connection got reset while it was in the backlog, so we try again.
_logger.ConnectionReset("(null)");
}
}
}
public ValueTask DisposeAsync()
{
_listenSocket?.Dispose();
return default;
}
public ValueTask UnbindAsync(CancellationToken cancellationToken = default)
{
_listenSocket?.Dispose();
return default;
}
}
3. ImplementationIConnectionListenerFactory
public sealed class UdpTransportFactory : IConnectionListenerFactory, IConnectionListenerFactorySelector
{
private readonly IRouteContractor contractor;
private readonly OrzLogger logger;
private readonly IUdpConnectionFactory connectionFactory;
public UdpTransportFactory(
IRouteContractor contractor,
OrzLogger logger,
IUdpConnectionFactory connectionFactory)
{
(contractor);
(logger);
= contractor;
= logger;
= connectionFactory;
}
public ValueTask<IConnectionListener> BindAsync(EndPoint endpoint, GatewayProtocols protocols, CancellationToken cancellationToken = default)
{
var transport = new UdpConnectionListener(endpoint, , contractor, logger, connectionFactory);
();
return new ValueTask<IConnectionListener>(transport);
}
public bool CanBind(EndPoint endpoint, GatewayProtocols protocols)
{
if (!()) return false;
return endpoint switch
{
IPEndPoint _ => true,
_ => false
};
}
}
4. InL4ProxyMiddleware
Implement udp proxy specific logic
The route and the previous tcp publication will not be listed here
public class L4ProxyMiddleware : IOrderMiddleware
{
public async Task Invoke(ConnectionContext context, ConnectionDelegate next)
{
try
{
if ( == )
{
await SNIProxyAsync(context);
}
else
{
var route = await (context);
if (route is null)
{
logger.NotFoundRouteL4();
}
else
{
= route;
();
if ( == )
{
await TcpProxyAsync(context, route);
}
else
{
await UdpProxyAsync((UdpConnectionContext)context, route);
}
();
}
}
}
catch (Exception ex)
{
(, ex);
}
finally
{
await next(context);
}
}
private async Task UdpProxyAsync(UdpConnectionContext context, RouteConfig route)
{
try
{
var socket = new Socket(, , );
var cts = (cancellationTokenSourcePool);
var token = ;
if (await DoUdpSendToAsync(socket, context, route, , await reqUdp(context, , token), token))
{
var c = ;
while (c > 0)
{
var r = await (socket, token);
c--;
await (, , await respUdp(context, (), token), token);
}
}
else
{
();
}
}
catch (OperationCanceledException)
{
();
}
catch (Exception ex)
{
(nameof(UdpProxyAsync), ex);
}
finally
{
?.();
}
}
So is it really simple? In theory, it is the same based on Kestrel.
optimization
Of course, referring to Kestrel's tcp socket processing, it is also somewhat simple to optimize, such as
- Not used
UdpClient
(PS is not because the implementation is bad, but because it is more common and there is no chance to change the content in it) - based on
SocketAsyncEventArgs, IValueTaskSource<SocketReceiveFromResult>
andSocketAsyncEventArgs, IValueTaskSource<int>
Implementation: Submit asynchronous read and write toPipeScheduler
Logic - based on
ConcurrentQueue<UdpSender>
Implement simple UDP sending object pool, strengthen object reuse, and slightly reduce memory usage - based on
ConcurrentQueue<PooledCancellationTokenSource>
Implement simpleCancellationTokenSource
Object pool, strengthen object reuse, slightly reduce memory usage
Sni Agent
In addition to the basic proxy of tcp and udp, we also tried to implement an sni proxy for tcp (such as https of http1 and http2)
However, currently, only the proxy does not perform ssl encryption and decryption, and the upstream handles itself. If the proxy wants to implement ssl encryption and decryption, it is theoretically based on off-the-shelfsslstream
sni proxy usage configuration
Just configureListen
Common sni listening port
Then configure your own route and upstream in different sni
At the same time, each route can passSupportSslProtocols
Limit tls version
Give a chestnut
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReverseProxy": {
"Listen": {
"snitest": {
"Protocols": "SNI",
"Address": [ "*:444" ]
}
},
"Routes": {
"snitestroute": {
"Protocols": "SNI",
"SupportSslProtocols": [ "Tls13", "Tls12" ],
"Match": {
"Hosts": [ "*" ]
},
"ClusterId": "apidemo"
},
"snitestroute2": {
"Protocols": "Tcp",
"Match": {
"Hosts": [ "*:448" ]
},
"ClusterId": "apidemo"
}
},
"Clusters": {
"apidemo": {
"LoadBalancingPolicy": "RoundRobin",
"HealthCheck": {
"Active": {
"Enable": true,
"Policy": "Connect"
}
},
"Destinations": [
{
"Address": ""
}
]
}
}
}
}
accomplish
The core implementation is actually only routing processing, and the proxy proxy and tcp proxy are exactly the same (just transport tcp data between requests and upstream)
Routing processing
passClientHello
Find the domain name to access, then find the upstream through the domain name matching route, and finally transfer the tcp data
ClientHello
Just transfer it directly fromTlsFrameHelper
///Route matching
public async ValueTask<(RouteConfig, ReadResult)> MatchSNIAsync(ConnectionContext context, CancellationToken token)
{
if (sniRoute is null) return (null, default);
var (hello, rr) = await TryGetClientHelloAsync(context, token);
if ()
{
var h = ;
var r = await ((), h, MatchSNI);
if (r is null)
{
();
}
return (r, rr);
}
else
{
("client hello failed");
return (null, rr);
}
}
/// Match the tls version
private bool MatchSNI(RouteConfig config, TlsFrameInfo info)
{
if (!) return true;
var v = ;
if (v == ) return true;
var t = ;
if ((SslProtocols.Tls13) && (SslProtocols.Tls13)) return true;
else if ((SslProtocols.Tls12) && (SslProtocols.Tls12)) return true;
else if ((SslProtocols.Tls11) && (SslProtocols.Tls11)) return true;
else if (() && ()) return true;
else if ((SslProtocols.Ssl3) && (SslProtocols.Ssl3)) return true;
else if ((SslProtocols.Ssl2) && (SslProtocols.Ssl2)) return true;
else if (() && ()) return true;
else return false;
}
/// parse ClientHello
private static async ValueTask<(TlsFrameInfo?, ReadResult)> TryGetClientHelloAsync(ConnectionContext context, CancellationToken token)
{
var input = ;
TlsFrameInfo info = default;
While (true)
{
var f = await (token).ConfigureAwait(false);
if ()
{
return (null, f);
}
var buffer = ;
if ( == 0)
{
continue;
}
var data = ? : ();
if ((data, ref info))
{
return (info, f);
}
else
{
(, );
continue;
}
}
}
Move tcp data
private async Task SNIProxyAsync(ConnectionContext context)
{
var c = ();
();
var (route, r) = await (context, );
if (route is not null)
{
= route;
();
ConnectionContext upstream = null;
try
{
upstream = await DoConnectionAsync(context, route, );
if (upstream is null)
{
();
}
else
{
?.();
var cts = (cancellationTokenSourcePool);
var t = ;
await (, t); // The only difference is that the tcp agent transports data. You must first send ClientHello data because it has been read by us.
();
var task = hasMiddlewareTcp?
await (
(new MiddlewarePipeWriter(, context, reqTcp), t)
, (new MiddlewarePipeWriter(, context, respTcp), t))
: await (
(, t)
, (, t));
if ()
{
(, );
}
}
}
catch (OperationCanceledException)
{
();
}
catch (Exception ex)
{
(nameof(TcpProxyAsync), ex);
}
Finally
{
?.();
upstream?.Abort();
}
();
}
}
All parts of the component are replaceable or added
Because the whole is based on IOC, all parts of the component can be replaced or added, and the customized extension is still very high.
The currently exposed list can be viewed in the code
internal static HostApplicationBuilder UseOrzDefaults(this HostApplicationBuilder builder)
{
var services = ;
<IHostedService, HostedService>();
();
<IMeterFactory, DummyMeterFactory>();
<IServer, OrzServer>();
<OrzLogger>();
<OrzMetrics>();
<IConnectionListenerFactory, SocketTransportFactory>();
<IConnectionListenerFactory, UdpTransportFactory>();
<IUdpConnectionFactory, UdpConnectionFactory>();
<IConnectionFactory, SocketConnectionFactory>();
<IRouteContractorValidator, RouteContractorValidator>();
<IEndPointConvertor, CommonEndPointConvertor>();
<IL4Router, L4Router>();
<IOrderMiddleware, L4ProxyMiddleware>();
<ILoadBalancingPolicyFactory, LoadBalancingPolicy>();
<IClusterConfigValidator, ClusterConfigValidator>();
<IDestinationResolver, DnsDestinationResolver>();
<ILoadBalancingPolicy, RandomLoadBalancingPolicy>();
<ILoadBalancingPolicy, RoundRobinLoadBalancingPolicy>();
<ILoadBalancingPolicy, LeastRequestsLoadBalancingPolicy>();
<ILoadBalancingPolicy, PowerOfTwoChoicesLoadBalancingPolicy>();
<IHealthReporter, PassiveHealthReporter>();
<IHealthUpdater, HealthyAndUnknownDestinationsUpdater>();
<IActiveHealthCheckMonitor, ActiveHealthCheckMonitor>();
<IActiveHealthChecker, ConnectionActiveHealthChecker>();
return builder;
}
For example, if you want to add a load balancing strategy, you can achieve it
public interface ILoadBalancingPolicy
{
string Name { get; }
DestinationState? PickDestination(ConnectionContext context, IReadOnlyList<DestinationState> availableDestinations);
}
If you are not satisfied with all existing load balancing strategies, you can replace them directlyILoadBalancingPolicyFactory
public interface ILoadBalancingPolicyFactory
{
DestinationState? PickDestination(ConnectionContext context, RouteConfig route);
}
For example, you can forward requests that are inaccessible to the development environment (or other environment) on a machine with other access permissions through SNI.
Almost done, making wheels is quite fun, of course if you are on GitHub/fs7744/NZOrzClick a star and it will be even more fun