using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Ultron.Proxy.Models; using Ultron.Proxy.Utils; namespace Ultron.Proxy { /// /// 反向连接处理类 /// public class ClientConnectionManager { /// /// 当app增加时触发 /// public event EventHandler AppTcpClientMapReverseConnected = delegate { }; public event EventHandler AppTcpClientMapConfigConnected = delegate { }; //public event EventHandler AppRemoved = delegate { }; //端口和app的映射关系,需定时清理 public Dictionary PortAppMap = new Dictionary(); //app和代理客户端socket之间的映射关系 public ConcurrentDictionary> AppTcpClientMap = new ConcurrentDictionary>(); //已注册的clientID,和appid之间的关系,appid序号=元素下标序号+1 public Dictionary> RegisteredClient = new Dictionary>(); private ClientConnectionManager() { ServerHost.Logger.Debug("ClientManager initialized"); Task.Run(ListenServiceClient); } private object _lockObject = new Object(); private object _lockObject2 = new Object(); private Random _rand = new Random(); private async Task ListenServiceClient() { //侦听,并且构造连接池 ServerHost.Logger.Debug("Listening client on port " + ServerHost.ClientServicePort + "..."); TcpListener listenter = new TcpListener(IPAddress.Any, ServerHost.ClientServicePort); listenter.Start(1000); while (true) { TcpClient incomeClient = await listenter.AcceptTcpClientAsync(); ServerHost.Logger.Debug("已建立一个空连接"); ProcessReverseRequest(incomeClient); } } /// /// 处理反向连接请求 /// /// /// private async Task ProcessReverseRequest(TcpClient incomeClient) { try { //读取头四个字节 byte[] bytes = new byte[4]; await incomeClient.GetStream().ReadAsync(bytes); var clientIdAppId = GetAppFromBytes(bytes); ServerHost.Logger.Debug("已获取到消息ClientID:" + clientIdAppId.ClientID.ToString() + "AppID:" + clientIdAppId.AppID.ToString() ); //分配 lock (_lockObject) { AppTcpClientMap.GetOrAdd(clientIdAppId, new BufferBlock()).Post(incomeClient); } //var arg = new AppChangedEventArgs(); //arg.App = clientIdAppId; //AppTcpClientMapReverseConnected(this, arg); } catch (Exception e) { ServerHost.Logger.Debug(e); } } private static ClientConnectionManager Instance = new Lazy(() => new ClientConnectionManager()).Value; public static ClientConnectionManager GetInstance() { return Instance; } public async Task GetClient(int consumerPort) { //从字典的list中取出tcpclient,并将其移除 ClientIDAppID clientappid = PortAppMap[consumerPort].ClientIdAppId; TcpClient client = await AppTcpClientMap[clientappid].ReceiveAsync(); PortAppMap[consumerPort].ReverseClients.Add(client); // AppTcpClientMap[clientappid].Remove(client); //AppRemoved(this, new AppChangedEventArgs { App = clientappid }); return client; } //通过客户端的id请求,分配好服务端端口和appid交给客户端 //arrange ConfigId from top 4 bytes which received from client. //response: // 2 1 1 1 1 ...N // clientid appid port appid2 port2 //request: // 2 2 // clientid count // methodType value = 0 public byte[] ArrageConfigIds(byte[] appRequestBytes, byte[] consumerPortBytes) { // byte[] arrangedBytes = new byte[256]; ClientModel clientModel = new ClientModel(); int clientId = (appRequestBytes[0] << 8) + appRequestBytes[1]; int appCount = (int)appRequestBytes[2]; if (clientId == 0) { lock (_lockObject) { byte[] tempClientIdBytes = new byte[2]; //分配clientid for (int i = 0; i < 10000; i++) { _rand.NextBytes(tempClientIdBytes); int tempClientId = (tempClientIdBytes[0] << 8) + tempClientIdBytes[1]; if (!RegisteredClient.ContainsKey(tempClientId)) { clientModel.ClientId = tempClientId; clientId = tempClientId; //注册客户端 RegisteredClient.Add(tempClientId, new List()); break; } } } } else { clientModel.ClientId = clientId; } lock (_lockObject2) { //循环获取appid,appid是元素下标+1 int maxAppCount = RegisteredClient[clientId].Count; //增加请求的客户端 //int[] ports = NetworkUtil.FindAvailableTCPPorts(20000, appCount); //foreach (var oneport in ports) Logger.Info(oneport + " "); clientModel.AppList = new List(appCount); for (int i = 0; i < appCount; i++) { int startPort = StringUtil.DoubleBytesToInt(consumerPortBytes[2 * i], consumerPortBytes[2 * i + 1]); int arrangedAppid = maxAppCount + i + 1; if (arrangedAppid > 255) throw new Exception("Stack overflow."); //查找port的起始端口如果未指定,则设置为20000 if (startPort == 0) startPort = 20000; int port = NetworkUtil.FindOneAvailableTCPPort(startPort); RegisteredClient[clientId].Add(new ClientIDAppID { ClientID = clientId, AppID = arrangedAppid }); clientModel.AppList.Add(new App { AppId = arrangedAppid, Port = port }); var appClient = PortAppMap[port] = new AppModel() { ClientIdAppId = new ClientIDAppID() { ClientID = clientId, AppID = arrangedAppid }, Tunnels = new List(), ReverseClients = new List() }; ServerHost.Logger.Info(port); //配置时触发 AppTcpClientMapConfigConnected(this, new AppChangedEventArgs() { App = appClient.ClientIdAppId }); } ServerHost.Logger.Debug(" <=端口已分配。"); } return clientModel.ToBytes(); } private ClientIDAppID GetAppFromBytes(byte[] bytes) { return new ClientIDAppID() { ClientID = (bytes[0] << 8) + bytes[1], AppID = bytes[2] }; } } }