using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Ultron.Proxy.Utils;
using Ultron.Proxy.Interfaces;
namespace Ultron.Proxy
{
//+------------------------+
//| NAT |
//| |
//| |
//| +----------+ | +-----------+
//| | | | | |
//| | client |------------> provider |
//| | | | | |
//| +----+-----+ | +------^----+
//| | | |
//| | | |
//| | | |
//| +----V-----+ | |
//| | | | |
//| | IIS | | |
//| | | | |
//| +----------+ | +------+-------+
//| | | |
//| | | consumer |
//| | | |
//+------------------------+ +--------------+
public class ServerHost
{
//服务端代理转发端口
public static int ClientServicePort = 9973;
//服务端配置通讯端口
public static int ConfigServicePort = 12307;
//远端管理端口
public static int WebManagementPort = 0;
public ClientConnectionManager ConnectionManager = null;
//inject
internal static ILogger Logger;
public ServerHost(ILogger logger)
{
Logger = logger;
}
//必须设置远程端口才可以通信
public ServerHost SetWebPort(int port)
{
WebManagementPort = port;
return this;
}
public async Task Start()
{
TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
CancellationTokenSource ctsConfig = new CancellationTokenSource();
CancellationTokenSource ctsHttp = new CancellationTokenSource();
CancellationTokenSource ctsConsumer = new CancellationTokenSource();
//1.反向连接池配置
ConnectionManager = ClientConnectionManager.GetInstance();
//注册客户端发生连接时的事件
ConnectionManager.AppTcpClientMapConfigConnected += ConnectionManager_AppAdded;
Logger.Debug("Ultron.Proxy server started");
//2.开启http服务
if (WebManagementPort > 0)
StartHttpService(ctsHttp);
//3.开启配置服务
try
{
await StartConfigService(ctsConfig);
}
catch (Exception ex)
{
Logger.Debug(ex.Message);
}
finally
{
Logger.Debug("all closed");
ctsConfig.Cancel();
//listenerConsumer.Stop();
}
////4.通过已配置的端口集合开启侦听
//foreach (var kv in ConnectionManager.PortAppMap)
//{
// ListenConsumeAsync(kv.Key, ctsConsumer.Token);
//}
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
{
Logger.Error(e.Exception.ToString(), e.Exception);
}
#region HTTPServer
private async Task StartHttpService(CancellationTokenSource ctsHttp)
{
try
{
HttpListener listener = new HttpListener();
listener.Prefixes.Add($"http://127.0.0.1:{WebManagementPort}/");
//TcpListener listenerConfigService = new TcpListener(IPAddress.Any, WebManagementPort);
Logger.Debug("Listening HTTP request on port " + WebManagementPort.ToString() + "...");
await AcceptHttpRequest(listener, ctsHttp);
}
catch (HttpListenerException ex)
{
Logger.Debug("Please run this program in administrator mode." + ex);
ServerHost.Logger.Error(ex.ToString(), ex);
}
catch (Exception ex)
{
Logger.Debug(ex);
ServerHost.Logger.Error(ex.ToString(), ex);
}
}
private async Task AcceptHttpRequest(HttpListener httpService, CancellationTokenSource ctsHttp)
{
httpService.Start();
while (true)
{
var client = await httpService.GetContextAsync();
ProcessHttpRequestAsync(client);
}
}
private async Task ProcessHttpRequestAsync(HttpListenerContext context)
{
try
{
var request = context.Request;
var response = context.Response;
response.ContentEncoding = Encoding.UTF8;
response.ContentType = "text/html;charset=utf-8";
//getJson
StringBuilder json = new StringBuilder("[ ");
foreach (var app in this.ConnectionManager.PortAppMap)
{
json.Append("{ ");
json.Append(KV2Json("port", app.Key)).C();
json.Append(KV2Json("clientId", app.Value.ClientIdAppId.ClientID)).C();
json.Append(KV2Json("appId", app.Value.ClientIdAppId.AppID)).C();
//反向连接
json.Append(KV2Json("revconns"));
json.Append("[ ");
foreach (var reverseClient in app.Value.ReverseClients)
{
json.Append("{ ");
if (reverseClient.Connected)
{
json.Append(KV2Json("lEndPoint", reverseClient.Client.LocalEndPoint.ToString())).C();
json.Append(KV2Json("rEndPoint", reverseClient.Client.RemoteEndPoint.ToString()));
}
//json.Append(KV2Json("p", c)).C();
//json.Append(KV2Json("port", ca.Key));
json.Append("}");
json.C();
}
json.D();
json.Append("]").C(); ;
//隧道状态
json.Append(KV2Json("tunnels"));
json.Append("[ ");
foreach (var tunnel in app.Value.Tunnels)
{
json.Append("{ ");
if (tunnel.ClientServerClient.Connected)
json.Append(KV2Json("clientServerClient", tunnel.ClientServerClient?.Client.LocalEndPoint.ToString())).C();
if (tunnel.ConsumerClient.Connected)
json.Append(KV2Json("consumerClient", tunnel.ConsumerClient?.Client.LocalEndPoint.ToString())).C();
json.D();
//json.Append(KV2Json("p", c)).C();
//json.Append(KV2Json("port", ca.Key));
json.Append("}");
json.C();
}
json.D();
json.Append("]");
json.Append("}").C();
}
json.D();
json.Append("]");
await response.OutputStream.WriteAsync(HtmlUtil.GetContent(json.ToString()));
//await response.OutputStream.WriteAsync(HtmlUtil.GetContent(request.RawUrl));
response.OutputStream.Close();
}
catch (Exception e)
{
Logger.Error(e.Message, e);
throw;
}
}
private string KV2Json(string key)
{
return "\"" + key + "\":";
}
private string KV2Json(string key, object value)
{
return "\"" + key + "\":\"" + value.ToString() + "\"";
}
#endregion
private async Task StartConfigService(CancellationTokenSource accepting)
{
TcpListener listenerConfigService = new TcpListener(IPAddress.Any, ConfigServicePort);
Logger.Debug("Listening config request on port " + ConfigServicePort.ToString() + "...");
var taskResultConfig = AcceptConfigRequest(listenerConfigService);
await taskResultConfig; //block here to hold open the server
}
///
/// 有连接连上则开始侦听新的端口
///
///
///
private void ConnectionManager_AppAdded(object sender, AppChangedEventArgs e)
{
Logger.Debug("AppTcpClientMapReverseConnected事件已触发");
int port = 0;
foreach (var kv in ConnectionManager.PortAppMap)
{
if (kv.Value.ClientIdAppId.AppID == e.App.AppID &&
kv.Value.ClientIdAppId.ClientID == e.App.ClientID) port = kv.Key;
}
if (port == 0) throw new Exception("app未注册");
var ct = new CancellationToken();
ListenConsumeAsync(port, ct);
}
#region 配置
//配置服务,客户端可以通过这个服务接收现有的空闲端口
//accept a config request.
//request:
// 2 1 1
// clientid appid nouse
//
//response:
// 2 1 1 ...N
// clientid appid port
private async Task AcceptConfigRequest(TcpListener listenerConfigService)
{
listenerConfigService.Start(100);
while (true)
{
var client = await listenerConfigService.AcceptTcpClientAsync();
ProcessConfigRequestAsync(client);
}
}
private async Task ProcessConfigRequestAsync(TcpClient client)
{
try
{
//长度固定4个字节
int configRequestLength = 3;
byte[] appRequestBytes = new byte[configRequestLength];
Logger.Debug("config request received.");
var nstream = client.GetStream();
//1.读取配置请求1
int resultByte = await nstream.ReadAsync(appRequestBytes);
Logger.Debug("appRequestBytes received.");
if (resultByte == 0)
{
CloseClient(client);
return;
}
//2.根据配置请求1获取更多配置信息
int appCount = (int)appRequestBytes[2];
byte[] consumerPortBytes = new byte[appCount * 2];
int resultByte2 = await nstream.ReadAsync(consumerPortBytes);
Logger.Debug("consumerPortBytes received.");
if (resultByte2 == 0)
{
CloseClient(client);
return;
}
//3.分配配置ID,并且写回给客户端
try
{
byte[] arrangedIds = ConnectionManager.ArrageConfigIds(appRequestBytes, consumerPortBytes);
Logger.Debug("apprequest arranged");
await nstream.WriteAsync(arrangedIds);
}
catch (Exception ex)
{ Logger.Debug(ex.ToString()); }
Logger.Debug("arrangedIds written.");
}
catch (Exception e)
{
Logger.Debug(e);
throw;
}
}
#endregion
///
/// 同时侦听来自consumer的链接和到provider的链接
///
///
///
///
async Task ListenConsumeAsync(int consumerPort, CancellationToken ct)
{
try
{
var consumerlistener = new TcpListener(IPAddress.Any, consumerPort);
consumerlistener.Start(1000);
//给两个listen,同时监听3端
var clientCounter = 0;
while (!ct.IsCancellationRequested)
{
//目标的代理服务联通了,才去处理consumer端的请求。
Logger.Debug("listening serviceClient....Port:" + consumerPort);
TcpClient consumerClient = await consumerlistener.AcceptTcpClientAsync();
//记录tcp隧道,消费端
TcpTunnel tunnel = new TcpTunnel();
tunnel.ConsumerClient = consumerClient;
ClientConnectionManager.GetInstance().PortAppMap[consumerPort].Tunnels.Add(tunnel);
Logger.Debug("consumer已连接:" + consumerClient.Client.RemoteEndPoint.ToString());
//消费端连接成功,连接
//需要端口
TcpClient s2pClient = await ConnectionManager.GetClient(consumerPort);
//记录tcp隧道,客户端
tunnel.ClientServerClient = s2pClient;
//✳关键过程✳
//连接完之后发送一个字节过去促使客户端建立转发隧道
await s2pClient.GetStream().WriteAsync(new byte[] { 1 }, 0, 1);
clientCounter++;
TcpTransferAsync(consumerlistener, consumerClient, s2pClient, clientCounter, ct);
}
}
catch (Exception e)
{
Logger.Debug(e);
}
}
#region datatransfer
//3端互相传输数据
async Task TcpTransferAsync(TcpListener consumerlistener, TcpClient consumerClient, TcpClient providerClient,
int clientIndex,
CancellationToken ct)
{
try
{
ServerHost.Logger.Debug($"New client ({clientIndex}) connected");
CancellationTokenSource transfering = new CancellationTokenSource();
var providerStream = providerClient.GetStream();
var consumerStream = consumerClient.GetStream();
Task taskC2PLooping = ToStaticTransfer(transfering.Token, consumerStream, providerStream);
Task taskP2CLooping = StreamTransfer(transfering.Token, providerStream, consumerStream);
//任何一端传输中断或者故障,则关闭所有连接
var comletedTask = await Task.WhenAny(taskC2PLooping, taskP2CLooping);
//comletedTask.
Logger.Debug($"Transferring ({clientIndex}) STOPPED");
consumerClient.Close();
providerClient.Close();
transfering.Cancel();
}
catch (Exception e)
{
Logger.Debug(e);
throw;
}
}
private async Task StreamTransfer(CancellationToken ct, NetworkStream fromStream, NetworkStream toStream)
{
await fromStream.CopyToAsync(toStream, ct);
}
private async Task ToStaticTransfer(CancellationToken ct, NetworkStream fromStream, NetworkStream toStream, Func> beforeTransferHandle = null)
{
await fromStream.CopyToAsync(toStream, ct);
}
private void CloseClient(TcpClient client)
{
Logger.Debug("invalid request,Closing client:" + client.Client.RemoteEndPoint.ToString());
client.Close();
Logger.Debug("Closed client:" + client.Client.RemoteEndPoint.ToString());
}
#endregion
}
}