Ultron.Proxy/Ultron.Proxy.Server/ClientConnectionManager.cs

208 lines
8.2 KiB
C#
Raw Permalink Normal View History

2019-04-19 11:04:11 +08:00
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Ultron.Proxy.Models;
using Ultron.Proxy.Utils;
namespace Ultron.Proxy
{
/// <summary>
/// 反向连接处理类
/// </summary>
public class ClientConnectionManager
{
/// <summary>
/// 当app增加时触发
/// </summary>
public event EventHandler<AppChangedEventArgs> AppTcpClientMapReverseConnected = delegate { };
public event EventHandler<AppChangedEventArgs> AppTcpClientMapConfigConnected = delegate { };
//public event EventHandler<AppChangedEventArgs> AppRemoved = delegate { };
//端口和app的映射关系需定时清理
public Dictionary<int, AppModel> PortAppMap = new Dictionary<int, AppModel>();
//app和代理客户端socket之间的映射关系
public ConcurrentDictionary<ClientIDAppID, BufferBlock<TcpClient>> AppTcpClientMap = new ConcurrentDictionary<ClientIDAppID, BufferBlock<TcpClient>>();
//已注册的clientID,和appid之间的关系,appid序号=元素下标序号+1
public Dictionary<int, List<ClientIDAppID>> RegisteredClient = new Dictionary<int, List<ClientIDAppID>>();
private ClientConnectionManager()
{
ServerHost.Logger.Debug("ClientManager initialized");
Task.Run(ListenServiceClient);
}
private object _lockObject = new Object();
private object _lockObject2 = new Object();
private Random _rand = new Random();
private async Task ListenServiceClient()
{
//侦听,并且构造连接池
ServerHost.Logger.Debug("Listening client on port " + ServerHost.ClientServicePort + "...");
TcpListener listenter = new TcpListener(IPAddress.Any, ServerHost.ClientServicePort);
listenter.Start(1000);
while (true)
{
TcpClient incomeClient = await listenter.AcceptTcpClientAsync();
ServerHost.Logger.Debug("已建立一个空连接");
ProcessReverseRequest(incomeClient);
}
}
/// <summary>
/// 处理反向连接请求
/// </summary>
/// <param name="incomeClient"></param>
/// <returns></returns>
private async Task ProcessReverseRequest(TcpClient incomeClient)
{
try
{
//读取头四个字节
byte[] bytes = new byte[4];
await incomeClient.GetStream().ReadAsync(bytes);
var clientIdAppId = GetAppFromBytes(bytes);
ServerHost.Logger.Debug("已获取到消息ClientID:" + clientIdAppId.ClientID.ToString()
+ "AppID:" + clientIdAppId.AppID.ToString()
);
//分配
lock (_lockObject)
{
AppTcpClientMap.GetOrAdd(clientIdAppId, new BufferBlock<TcpClient>()).Post(incomeClient);
}
//var arg = new AppChangedEventArgs();
//arg.App = clientIdAppId;
//AppTcpClientMapReverseConnected(this, arg);
}
catch (Exception e)
{
ServerHost.Logger.Debug(e);
}
}
private static ClientConnectionManager Instance = new Lazy<ClientConnectionManager>(() => new ClientConnectionManager()).Value;
public static ClientConnectionManager GetInstance()
{
return Instance;
}
public async Task<TcpClient> GetClient(int consumerPort)
{
//从字典的list中取出tcpclient并将其移除
ClientIDAppID clientappid = PortAppMap[consumerPort].ClientIdAppId;
TcpClient client = await AppTcpClientMap[clientappid].ReceiveAsync();
PortAppMap[consumerPort].ReverseClients.Add(client);
// AppTcpClientMap[clientappid].Remove(client);
//AppRemoved(this, new AppChangedEventArgs { App = clientappid });
return client;
}
//通过客户端的id请求分配好服务端端口和appid交给客户端
//arrange ConfigId from top 4 bytes which received from client.
//response:
// 2 1 1 1 1 ...N
// clientid appid port appid2 port2
//request:
// 2 2
// clientid count
// methodType value = 0
public byte[] ArrageConfigIds(byte[] appRequestBytes, byte[] consumerPortBytes)
{
// byte[] arrangedBytes = new byte[256];
ClientModel clientModel = new ClientModel();
int clientId = (appRequestBytes[0] << 8) + appRequestBytes[1];
int appCount = (int)appRequestBytes[2];
if (clientId == 0)
{
lock (_lockObject)
{
byte[] tempClientIdBytes = new byte[2];
//分配clientid
for (int i = 0; i < 10000; i++)
{
_rand.NextBytes(tempClientIdBytes);
int tempClientId = (tempClientIdBytes[0] << 8) + tempClientIdBytes[1];
if (!RegisteredClient.ContainsKey(tempClientId))
{
clientModel.ClientId = tempClientId;
clientId = tempClientId;
//注册客户端
RegisteredClient.Add(tempClientId, new List<ClientIDAppID>());
break;
}
}
}
}
else
{
clientModel.ClientId = clientId;
}
lock (_lockObject2)
{
//循环获取appidappid是元素下标+1
int maxAppCount = RegisteredClient[clientId].Count;
//增加请求的客户端
//int[] ports = NetworkUtil.FindAvailableTCPPorts(20000, appCount);
//foreach (var oneport in ports) Logger.Info(oneport + " ");
clientModel.AppList = new List<App>(appCount);
for (int i = 0; i < appCount; i++)
{
int startPort = StringUtil.DoubleBytesToInt(consumerPortBytes[2 * i], consumerPortBytes[2 * i + 1]);
int arrangedAppid = maxAppCount + i + 1;
if (arrangedAppid > 255) throw new Exception("Stack overflow.");
//查找port的起始端口如果未指定则设置为20000
if (startPort == 0) startPort = 20000;
int port = NetworkUtil.FindOneAvailableTCPPort(startPort);
RegisteredClient[clientId].Add(new ClientIDAppID
{
ClientID = clientId,
AppID = arrangedAppid
});
clientModel.AppList.Add(new App
{
AppId = arrangedAppid,
Port = port
});
var appClient = PortAppMap[port] = new AppModel()
{
ClientIdAppId = new ClientIDAppID()
{
ClientID = clientId,
AppID = arrangedAppid
},
Tunnels = new List<TcpTunnel>(),
ReverseClients = new List<TcpClient>()
};
ServerHost.Logger.Info(port);
//配置时触发
AppTcpClientMapConfigConnected(this, new AppChangedEventArgs() { App = appClient.ClientIdAppId });
}
ServerHost.Logger.Debug(" <=端口已分配。");
}
return clientModel.ToBytes();
}
private ClientIDAppID GetAppFromBytes(byte[] bytes)
{
return new ClientIDAppID()
{
ClientID = (bytes[0] << 8) + bytes[1],
AppID = bytes[2]
};
}
}
}